12 Spark on RDD 分区器
RDD 分区器
Spark 目前支持 Hash 分区和 Range 分区,和用户自定义分区。Hash 分区为当前的默认分区。分区器直接决定了 RDD 中分区的个数、RDD 中每条数据经过 Shuffle 后进入哪个分区,进而决定了 Reduce 的个数。只有 Key-Value 类型的 RDD 才有分区器,非 Key-Value 类型的 RDD 分区的值是 None每个 RDD 的分区 ID 范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。
Hash 分区:对于给定的 key,计算其 hashCode,并除以分区个数取余。
class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be
negative.")
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
}
Range 分区:将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序
class RangePartitioner[K : Ordering : ClassTag, V](
partitions: Int,
rdd: RDD[_ <: Product2[K, V]],
private var ascending: Boolean = true)
extends Partitioner {
// We allow partitions = 0, which happens when sorting an empty RDD under the
default settings.
require(partitions >= 0, s"Number of partitions cannot be negative but found
$partitions.")
private var ordering = implicitly[Ordering[K]]
// An array of upper bounds for the first (partitions - 1) partitions
private var rangeBounds: Array[K] = {
...
}
def numPartitions: Int = rangeBounds.length + 1
private var binarySearch: ((Array[K], K) => Int) =
CollectionsUtils.makeBinarySearch[K]
def getPartition(key: Any): Int = {
val k = key.asInstanceOf[K]
var partition = 0
if (rangeBounds.length <= 128) {
// If we have less than 128 partitions naive search
while (partition < rangeBounds.length && ordering.gt(k,
rangeBounds(partition))) {
partition += 1
}
} else {
// Determine which binary search method to use only once.
partition = binarySearch(rangeBounds, k)
// binarySearch either returns the match location or -[insertion point]-1
if (partition < 0) {
partition = -partition-1 }
if (partition > rangeBounds.length) {
partition = rangeBounds.length
} }
if (ascending) {
partition
} else {
rangeBounds.length - partition
} }
override def equals(other: Any): Boolean = other match {
...
}
override def hashCode(): Int = {
...
}
@throws(classOf[IOException])
private def writeObject(out: ObjectOutputStream): Unit =
Utils.tryOrIOException {
...
}
@throws(classOf[IOException])
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException
{
...
} }
12 Spark on RDD 分区器相关推荐
- Spark的RDD分区器
RDD 分区器 基本介绍 Spark 目前支持Hash 分区.Range 分区和用户自定义分区.Hash 分区为当前的默认分区.分区器直接决定了RDD 中分区的个数.RDD 中每条数据经过Shuffl ...
- 【spark】RDD分区解析
文章目录 前言 一.集合分区 1.设置分区 2. 数据分配源码解析 二.文件分区 1.设置分区 2.Hadoop 读取文件1.1概念 前言 RDD(Resilient Distributed Data ...
- spark RDD分区2GB限制(Size exceeds Integer.MAX_VALUE)
最近使用spark处理较大的数据文件,遇到了分区2G限制的问题,spark日志会报如下的日志: WARN scheduler.TaskSetManager: Lost task 19.0 in sta ...
- Spark的RDD转换算子
目录 RDD转换算子 Value 类型 1. map 2. mapPartitions map 和mapPartitions 的区别 3. mapPartitionsWithIndex 4. flat ...
- RDD分区2GB限制
本文目的 最近使用spark处理较大的数据时,遇到了分区2G限制的问题(ken).找到了解决方法,并且在网上收集了一些资料,记录在这里,作为备忘. 问题现象 遇到这个问题时,spark日志会报如下的日 ...
- Spark:RDD编程总结(概述、算子、分区、共享变量)
目录 1.RDD概述 1.1.RDD是什么 1.2.RDD的弹性 1.3.RDD的特点 1.3.1.分区 1.3.2.只读 1.3.3.依赖 1.3.4.缓存 1.3.5.检查点 2.RDD编程 2. ...
- spark更改分区_spark RDD分区是否可以指定分区
更多详细内容 数据分区: 在分布式集群里,网络通信的代价很大,减少网络传输可以极大提升性能. mapreduce框架的性能开支主要在io和网络传输,io因为要大量读写文件,它是不可避免的,但是网络传输 ...
- Spark分区器HashPartitioner和RangePartitioner代码详解
转载: https://www.iteblog.com/archives/1522.html 在Spark中分区器直接决定了RDD中分区的个数:也决定了RDD中每条数据经过Shuffle过程属于哪个分 ...
- Spark自定义分区器
spark目前支持两个分区器,分别是HashPartitioner和RangePartitioner. 均继承自Partitioner,有共同方法 - def numPartitions --分区器的 ...
最新文章
- verilog中如何拆分一个数
- Leetcode 之Count and Say(35)
- Prism安装、MVVM基础概念及一个简单的样例
- 企业网站 源码 服务邮箱:_企业网站建设对于服务器的选择至关重要
- 第 3 章 镜像 - 015 - 调试 Dockerfile
- 深入分析glibc内存释放时的死锁bug
- NUC980开发板Linux系统EC20模块 移植 串口 PPP拨号
- [高级软件调试方法] SoftProbe调试方法及实现
- c语言判断二叉树是不是二叉排序树_判断
- 用EEupdate修改Intel网卡类型
- 百度旋转验证码打码模块,集成鱼刺模块类
- 自签名证书和私有CA签名的证书的区别
- 量子计算与通讯的基本原理(量子纠缠)
- 美国大学工程计算机排名,想从事编程?2019年USNews美国大学计算机工程专业排名值得一看...
- Kruise Rollout:灵活可插拔的渐进式发布框架
- 出现这十种症状,说明你不适合干程序员
- teamviewer14 去商用途提示
- 编写训练一年级学生10以内减法的程序
- OpenCV实战之人脸美颜美型(四)——肤色检测
- 服务器不能复制粘贴文件的处理方式
热门文章
- Netsuite案例:达美乐比萨
- Armbain系统根分区空间不足处理
- C语言 立方体随鼠标转动,HTML5 盒子悬停动效 - 立方体沿鼠标方向翻滚
- 成长笔记2:用这15句话夸孩子,决定他们一生思维高度
- 谷歌金山词霸中自定义TTS语音的实现(告别难听的默认男声!)
- UbuntuKylin 安装教程
- Linux常用命令(Hadoop)
- [附源码]计算机毕业设计JAVA网上花店系统
- Java使用 Delayed 实现延迟任务
- int a; int* a; int** a; int (*a)[]; int (*a)(int)