RDD 分区器

基本介绍

Spark 目前支持Hash 分区、Range 分区和用户自定义分区。Hash 分区为当前的默认分区。分区器直接决定了RDD 中分区的个数、RDD 中每条数据经过Shuffle 后进入哪个分区,进而决定了Reduce 的个数。

(1)只有Key-Value 类型的RDD 才有分区器,非 Key-Value 类型的RDD 分区的值是 None。
(2)每个RDD 的分区 ID 范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。

1. Hash 分区

说明

对于给定的 key,计算其hashCode,并除以分区个数取余。

源码

class HashPartitioner(partitions: Int) extends Partitioner { require(partitions >= 0, s"Number of partitions ($partitions) cannot be
negative.") def numPartitions: Int = partitions def getPartition(key: Any): Int = key match { case null => 0 case _ => Utils.nonNegativeMod(key.hashCode, numPartitions) } override def equals(other: Any): Boolean = other match { case h: HashPartitioner => h.numPartitions == numPartitions case _ => false } override def hashCode: Int = numPartitions
}

2. Range分区

说明

将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序。

源码

class RangePartitioner[K : Ordering : ClassTag, V]( partitions: Int, rdd: RDD[_ <: Product2[K, V]], private var ascending: Boolean = true) extends Partitioner { // We allow partitions = 0, which happens when sorting an empty RDD under the
default settings. require(partitions >= 0, s"Number of partitions cannot be negative but found
$partitions.") private var ordering = implicitly[Ordering[K]] // An array of upper bounds for the first (partitions - 1) partitions private var rangeBounds: Array[K] = { ... } def numPartitions: Int = rangeBounds.length + 1 private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K] def getPartition(key: Any): Int = { val k = key.asInstanceOf[K] var partition = 0 if (rangeBounds.length <= 128) { // If we have less than 128 partitions naive search while (partition < rangeBounds.length && ordering.gt(k,
rangeBounds(partition))) { partition += 1 } } else { // Determine which binary search method to use only once. partition = binarySearch(rangeBounds, k) // binarySearch either returns the match location or -[insertion point]-1 if (partition < 0) { partition = -partition-1 } if (partition > rangeBounds.length) { partition = rangeBounds.length } } if (ascending) { partition } else { rangeBounds.length - partition } } override def equals(other: Any): Boolean = other match { ... } override def hashCode(): Int = { ... } @throws(classOf[IOException]) private def writeObject(out: ObjectOutputStream): Unit =
Utils.tryOrIOException { ... } @throws(classOf[IOException]) private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException
{ ... }
}

3. 用户自定义分区

说明

用户可以根据自己的需要,自定义分区个数。

案例实操

package com.atguigu.bigdata.spark.core.rdd.partimport org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}object Spark01_RDD_Part {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark01_RDD_Part")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(("nba","xxxxxxx"),("cba","xxxxxxx"),("wnba","xxxxxxx"),("nba","xxxxxxx")),3)val partRDD: RDD[(String, String)] = rdd.partitionBy(new MyPartitioner)partRDD.saveAsTextFile("output")sc.stop()}/*** 自定义分区器* 1. 继承 Partitioner* 2. 重写方法*/class MyPartitioner extends Partitioner {//分区数量override def numPartitions: Int = 3//根据数据的key值返回数据的分区索引(从0开始)override def getPartition(key: Any): Int = {key match {case "nba" => 0case "wnba" => 1case "cba" => 2case _ => 2}
//      if (key == "nba"){//        0
//      }else if ( key == "wnba"){//       1
//      }else if (key == "cba"){//        2
//      }else {//        2
//      }}}}

Spark的RDD分区器相关推荐

  1. 12 Spark on RDD 分区器

    RDD 分区器 Spark 目前支持 Hash 分区和 Range 分区,和用户自定义分区.Hash 分区为当前的默认分区.分区器直接决定了 RDD 中分区的个数.RDD 中每条数据经过 Shuffl ...

  2. 【spark】RDD分区解析

    文章目录 前言 一.集合分区 1.设置分区 2. 数据分配源码解析 二.文件分区 1.设置分区 2.Hadoop 读取文件1.1概念 前言 RDD(Resilient Distributed Data ...

  3. spark RDD分区2GB限制(Size exceeds Integer.MAX_VALUE)

    最近使用spark处理较大的数据文件,遇到了分区2G限制的问题,spark日志会报如下的日志: WARN scheduler.TaskSetManager: Lost task 19.0 in sta ...

  4. Spark的RDD转换算子

    目录 RDD转换算子 Value 类型 1. map 2. mapPartitions map 和mapPartitions 的区别 3. mapPartitionsWithIndex 4. flat ...

  5. RDD分区2GB限制

    本文目的 最近使用spark处理较大的数据时,遇到了分区2G限制的问题(ken).找到了解决方法,并且在网上收集了一些资料,记录在这里,作为备忘. 问题现象 遇到这个问题时,spark日志会报如下的日 ...

  6. spark更改分区_spark RDD分区是否可以指定分区

    更多详细内容 数据分区: 在分布式集群里,网络通信的代价很大,减少网络传输可以极大提升性能. mapreduce框架的性能开支主要在io和网络传输,io因为要大量读写文件,它是不可避免的,但是网络传输 ...

  7. Spark分区器HashPartitioner和RangePartitioner代码详解

    转载: https://www.iteblog.com/archives/1522.html 在Spark中分区器直接决定了RDD中分区的个数:也决定了RDD中每条数据经过Shuffle过程属于哪个分 ...

  8. Spark自定义分区器

    spark目前支持两个分区器,分别是HashPartitioner和RangePartitioner. 均继承自Partitioner,有共同方法 - def numPartitions --分区器的 ...

  9. spark rdd分区与任务的关系

    spark rdd分区与任务关系 rdd是弹性分布式数据集,分区是对rdd数据的划分.分区之后,job并行度增大.一个分区对应一个任务.           什么是任务,任务是job的执行逻辑单元.t ...

最新文章

  1. GET 和 POST请求的本质区别是什么?原来我一直理解错了
  2. python版本不同影响大不大_Python的不同版本对编程有影响吗
  3. python工程师薪资坑吗-完美起航-20201024——记录一下自己的前端工程师之路
  4. 《京东技术解密》读书笔记:坚持技术十年如一日
  5. 查看安卓模拟器 CPU版本
  6. 用户二次登陆,干掉第一次登录的session
  7. MySQL语句的语法
  8. 利用状态图实现词法分析
  9. JPA和CMT –为什么捕获持久性异常不够?
  10. Java未来路在何方?图文详解!
  11. 前端学习(588):console面板简介与交互式命令
  12. python格式化字符串%r_Python语法速查:3.字符串格式化
  13. 逆序对(洛谷P1908题题解,Java语言描述)
  14. 第二届广东省大学生网络攻防大赛 simple_re
  15. H5页面调用微信扫一扫
  16. Linux基础-进程管理
  17. 基于QT+Opencv的红眼去除
  18. 如何不靠运气变得富有 (十一) —— 选择智慧、精力充沛和正直的合作伙伴
  19. ubuntu安装和卸载软件命令
  20. 什么是PDF(便携式文档格式)文件以及如何打开PDF?

热门文章

  1. C标准库 limits.h
  2. hdu 5247 找连续数(思维)
  3. NVL 和NVL2函数
  4. 初探设计:Java继承何时用?怎么用?
  5. 查看Entity Framework生成的SQL语句
  6. jQuery判断获得的对象是否存在的方法
  7. DSP集成开发工具CCS的Git工具使用说明(一)
  8. CCS MAP文件说明
  9. C++获取当前时间和计算程序运行时间的方法
  10. php导出excel列数太多,php生成excel列名,超过26列大于Z问题解决办法