Spark的RDD分区器
RDD 分区器
基本介绍
Spark 目前支持Hash 分区、Range 分区和用户自定义分区。Hash 分区为当前的默认分区。分区器直接决定了RDD 中分区的个数、RDD 中每条数据经过Shuffle 后进入哪个分区,进而决定了Reduce 的个数。
(1)只有Key-Value 类型的RDD 才有分区器,非 Key-Value 类型的RDD 分区的值是 None。
(2)每个RDD 的分区 ID 范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。
1. Hash 分区
说明
对于给定的 key,计算其hashCode,并除以分区个数取余。
源码
class HashPartitioner(partitions: Int) extends Partitioner { require(partitions >= 0, s"Number of partitions ($partitions) cannot be
negative.") def numPartitions: Int = partitions def getPartition(key: Any): Int = key match { case null => 0 case _ => Utils.nonNegativeMod(key.hashCode, numPartitions) } override def equals(other: Any): Boolean = other match { case h: HashPartitioner => h.numPartitions == numPartitions case _ => false } override def hashCode: Int = numPartitions
}
2. Range分区
说明
将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序。
源码
class RangePartitioner[K : Ordering : ClassTag, V]( partitions: Int, rdd: RDD[_ <: Product2[K, V]], private var ascending: Boolean = true) extends Partitioner { // We allow partitions = 0, which happens when sorting an empty RDD under the
default settings. require(partitions >= 0, s"Number of partitions cannot be negative but found
$partitions.") private var ordering = implicitly[Ordering[K]] // An array of upper bounds for the first (partitions - 1) partitions private var rangeBounds: Array[K] = { ... } def numPartitions: Int = rangeBounds.length + 1 private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K] def getPartition(key: Any): Int = { val k = key.asInstanceOf[K] var partition = 0 if (rangeBounds.length <= 128) { // If we have less than 128 partitions naive search while (partition < rangeBounds.length && ordering.gt(k,
rangeBounds(partition))) { partition += 1 } } else { // Determine which binary search method to use only once. partition = binarySearch(rangeBounds, k) // binarySearch either returns the match location or -[insertion point]-1 if (partition < 0) { partition = -partition-1 } if (partition > rangeBounds.length) { partition = rangeBounds.length } } if (ascending) { partition } else { rangeBounds.length - partition } } override def equals(other: Any): Boolean = other match { ... } override def hashCode(): Int = { ... } @throws(classOf[IOException]) private def writeObject(out: ObjectOutputStream): Unit =
Utils.tryOrIOException { ... } @throws(classOf[IOException]) private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException
{ ... }
}
3. 用户自定义分区
说明
用户可以根据自己的需要,自定义分区个数。
案例实操
package com.atguigu.bigdata.spark.core.rdd.partimport org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}object Spark01_RDD_Part {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark01_RDD_Part")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(("nba","xxxxxxx"),("cba","xxxxxxx"),("wnba","xxxxxxx"),("nba","xxxxxxx")),3)val partRDD: RDD[(String, String)] = rdd.partitionBy(new MyPartitioner)partRDD.saveAsTextFile("output")sc.stop()}/*** 自定义分区器* 1. 继承 Partitioner* 2. 重写方法*/class MyPartitioner extends Partitioner {//分区数量override def numPartitions: Int = 3//根据数据的key值返回数据的分区索引(从0开始)override def getPartition(key: Any): Int = {key match {case "nba" => 0case "wnba" => 1case "cba" => 2case _ => 2}
// if (key == "nba"){// 0
// }else if ( key == "wnba"){// 1
// }else if (key == "cba"){// 2
// }else {// 2
// }}}}
Spark的RDD分区器相关推荐
- 12 Spark on RDD 分区器
RDD 分区器 Spark 目前支持 Hash 分区和 Range 分区,和用户自定义分区.Hash 分区为当前的默认分区.分区器直接决定了 RDD 中分区的个数.RDD 中每条数据经过 Shuffl ...
- 【spark】RDD分区解析
文章目录 前言 一.集合分区 1.设置分区 2. 数据分配源码解析 二.文件分区 1.设置分区 2.Hadoop 读取文件1.1概念 前言 RDD(Resilient Distributed Data ...
- spark RDD分区2GB限制(Size exceeds Integer.MAX_VALUE)
最近使用spark处理较大的数据文件,遇到了分区2G限制的问题,spark日志会报如下的日志: WARN scheduler.TaskSetManager: Lost task 19.0 in sta ...
- Spark的RDD转换算子
目录 RDD转换算子 Value 类型 1. map 2. mapPartitions map 和mapPartitions 的区别 3. mapPartitionsWithIndex 4. flat ...
- RDD分区2GB限制
本文目的 最近使用spark处理较大的数据时,遇到了分区2G限制的问题(ken).找到了解决方法,并且在网上收集了一些资料,记录在这里,作为备忘. 问题现象 遇到这个问题时,spark日志会报如下的日 ...
- spark更改分区_spark RDD分区是否可以指定分区
更多详细内容 数据分区: 在分布式集群里,网络通信的代价很大,减少网络传输可以极大提升性能. mapreduce框架的性能开支主要在io和网络传输,io因为要大量读写文件,它是不可避免的,但是网络传输 ...
- Spark分区器HashPartitioner和RangePartitioner代码详解
转载: https://www.iteblog.com/archives/1522.html 在Spark中分区器直接决定了RDD中分区的个数:也决定了RDD中每条数据经过Shuffle过程属于哪个分 ...
- Spark自定义分区器
spark目前支持两个分区器,分别是HashPartitioner和RangePartitioner. 均继承自Partitioner,有共同方法 - def numPartitions --分区器的 ...
- spark rdd分区与任务的关系
spark rdd分区与任务关系 rdd是弹性分布式数据集,分区是对rdd数据的划分.分区之后,job并行度增大.一个分区对应一个任务. 什么是任务,任务是job的执行逻辑单元.t ...
最新文章
- GET 和 POST请求的本质区别是什么?原来我一直理解错了
- python版本不同影响大不大_Python的不同版本对编程有影响吗
- python工程师薪资坑吗-完美起航-20201024——记录一下自己的前端工程师之路
- 《京东技术解密》读书笔记:坚持技术十年如一日
- 查看安卓模拟器 CPU版本
- 用户二次登陆,干掉第一次登录的session
- MySQL语句的语法
- 利用状态图实现词法分析
- JPA和CMT –为什么捕获持久性异常不够?
- Java未来路在何方?图文详解!
- 前端学习(588):console面板简介与交互式命令
- python格式化字符串%r_Python语法速查:3.字符串格式化
- 逆序对(洛谷P1908题题解,Java语言描述)
- 第二届广东省大学生网络攻防大赛 simple_re
- H5页面调用微信扫一扫
- Linux基础-进程管理
- 基于QT+Opencv的红眼去除
- 如何不靠运气变得富有 (十一) —— 选择智慧、精力充沛和正直的合作伙伴
- ubuntu安装和卸载软件命令
- 什么是PDF(便携式文档格式)文件以及如何打开PDF?