• spark目前支持两个分区器,分别是HashPartitioner和RangePartitioner.
  • 均继承自Partitioner,有共同方法
- def numPartitions --分区器的分区数量
- def getPartition(key: Any): Int  ---获取某一个key的分区号

HashPartitioner

Spark中非常重要的一个分区器,也是默认分区器,默认用于90%以上的RDD相关API上

功能:

  • 调用key的hashCode对分区数量取模,结果就是这个kv对的分区号
  • 当key为null的时候,分区号为0;
  • 分区器基本上适合所有RDD数据类型的数据进行分区操作;
  • 但是需要注意的是,由于JAVA中数组的hashCode是基于数组对象本身的,不是基于数组内容的,所以如果RDD的key是数组类型,那么可能导致数据内容一致的数据key没法分配到同一个RDD分区中,这个时候最好自定义数据分区器,采用数组内容进行分区或者将数组的内容转换为集合

源码HashPartitioner定义如下

class HashPartitioner(partitions: Int) extends Partitioner {require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
//分区的总数,可以利用构造函数从外部传入def numPartitions: Int = partitions
//依据key进行分区的划分,就是划分每个key属于哪个分区def getPartition(key: Any): Int = key match {case null => 0case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)}override def equals(other: Any): Boolean = other match {case h: HashPartitioner =>h.numPartitions == numPartitionscase _ =>false}override def hashCode: Int = numPartitions
}

RangePartitioner

SparkCore中除了HashPartitioner分区器外,另外一个比较重要的已经实现的分区器,主要用于RDD的数据排序相关API中

比如sortByKey底层使用的数据分区器就是RangePartitioner分区器;

第一步:先从整个RDD中抽取出样本数据,将样本数据排序,计算出每个分区的最大key值,形成一个Array[KEY]类型的数组变量rangeBounds;
第二步:判断key在rangeBounds中所处的范围,给出该key值在下一个RDD中的分区id下标;该分区器要求RDD中的KEY类型必须是可以排序的,

自定义分区器实现按首字母分区

  1. 继承Partitioner
  2. 重写逻辑

需求,单词频率统计 aA-nN在0分区,其他字母开头的在1分区,其他在2分区

package src.main.scalaimport org.apache.spark.{Partitioner, SparkConf, SparkContext}
import org.apache.spark.rdd.RDDobject _PartitionDemo extends App {val conf = new SparkConf().setAppName("wordcount").setMaster("local")val sc = new SparkContext(conf)val lines: RDD[String] = sc.textFile("D:\\tmp", 2)val words: RDD[String] = lines.flatMap(_.split(" "))val tuples: RDD[(String, Int)] = words.map((_, 1))val myPartitioner = new WordCountPartition(3)val summed: RDD[(String, Int)] = tuples.reduceByKey(_ + _)//在此处调用自定义的分区器val sorted: RDD[(String, Int)] = summed.sortBy(_._2, ascending = false).partitionBy(myPartitioner)sorted.saveAsTextFile("output5")sc.stop()
}//自定义分区器,继承Partitioner
class WordCountPartition(numPartition: Int) extends Partitioner {//numPartitions代表分区的数目,可以利用构造函数传入,也可以设定为固定值override def numPartitions: Int = numPartition
//依据输入内容计算分区id(就是属于哪个分区)override def getPartition(key: Any): Int = {//key的类型为Any,需要转换为Stringvar str: String = key.toString//取字符串的第一个字符的内容val first: String = str.substring(0,1)//matches的参数为正则表达式if (first.matches("[A-Na-n]")) {0} else if (first.matches("[o-zO-Z]")) {1} else {2}}
}

自定义分区器处理如下文件

20161123101523   http://java.learn.com/java/javaee.shtml
20161123101523  http://java.learn.com/java/javaee.shtml
20161123101523  http://ui.learn.com/ui/video.shtml
20161123101523  http://bigdata.learn.com/bigdata/teacher.shtml
20161123101523  http://android.learn.com/android/video.shtml
20161123101523  http://h5.learn.com/h5/teacher.shtml
20161123101523  http://h5.learn.com/h5/course.shtml
20161123101523  http://bigdata.learn.com/bigdata/teacher.shtml
* 自定义分区器
* 需求:数据中有不同的学科,统计每个学科的访问量,将输出的一个学科生成一个文件**   (ui.learn.com,86)
import java.net.URL
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
import scala.collection.mutableobject _PartitionDemo2 extends App {val conf = new SparkConf().setAppName("SubjectDemo").setMaster("local")val sc = new SparkContext(conf)//读取文件进行切分val lines: RDD[(String, Int)] = sc.textFile("access.txt", 2).map(x => {//利用空格切分每一行  val fields: Array[String] = x.split("[\\s]+")//提取第二个字段val str: String = fields(1)//借用java的URL对象获取hostval url = new URL(str)val host: String = url.getHost//构建元组,1是为了方便后面统计(host, 1)})//统计个host的数量val sumed: RDD[(String, Int)] = lines.reduceByKey(_ + _).cache()//获取完全不同的host的数组,作为后续分区的依据,因为题目要求每个host分一个区val subjects: Array[String] = sumed.keys.distinct.collect//创建自定义分区器对象var partioner = new SubjectPartitioner(subjects)//调用自定义分区器val value: RDD[(String, Int)] = sumed.partitionBy(partioner)//输出结果value.saveAsTextFile("out3")sc.stop()
}
//创建自定义分区器.其中分区总数量由subjects计算而来,就是完全不同的host的数量
class SubjectPartitioner(subjects: Array[String]) extends Partitioner {//创建一个map用于存储学科和分区号val subject = new mutable.HashMap[String, Int]()var i = 0//实际构建map,实质创建key和分区号的对应关系for (s <- subjects) {subject += (s -> i)//分区号自增i += 1}override def numPartitions: Int = {//总分区数目subjects.length}//分区号等于key所对应的value的值override def getPartition(key: Any): Int = subject.getOrElse(key.toString, 0)
}

Spark自定义分区器相关推荐

  1. IDEA本地运行Spark项目[演示自定义分区器]并查看HDFS结果文件

    文章目录 一.提出问题 二.解决问题 (一)添加IP到主机名的映射 (二)在本地准备Spark库文件 (三)在IDEA里创建Scala项目 (四)添加Spark库文件到项目 (五)创建自定义分区器 ( ...

  2. 【大数据开发】SparkCore——自定义排序、实现序列化、自定义分区器

    文章目录 一.自定义排序四种方式.实现序列化 二.案例:自定义分区器 一.自定义排序四种方式.实现序列化 前面两种是样例类实现.普通类实现 第三种方式可以不实现序列化接口 用的最多的还是第四种方式,第 ...

  3. Hive自定义分区器流程

    Hive自定义分区器流程 1. 环境说明 当前环境采用Hadoop3.1.3以及Hive3.1.2版本! 2. 自定义类 自定义类实现org.apache.hadoop.mapred.Partitio ...

  4. hive自定义分区器

    Hive自定义分区器流程 1.自定义类 实现org.apache.hadoop.mapred.Partitioner(必须为这个,Hive中使用的是老的API)接口 package com.ailib ...

  5. Spark自定义分区(Partitioner)

    我们都知道Spark内部提供了HashPartitioner和RangePartitioner两种分区策略(这两种分区的代码解析可以参见:<Spark分区器HashPartitioner和Ran ...

  6. Spark的RDD分区器

    RDD 分区器 基本介绍 Spark 目前支持Hash 分区.Range 分区和用户自定义分区.Hash 分区为当前的默认分区.分区器直接决定了RDD 中分区的个数.RDD 中每条数据经过Shuffl ...

  7. 21,spark sql 测试 : 1.4G 文件实战,测试耗时多少,先分区,再在分区内计算,用列内容分区( 这是一个很魔幻的问题 ),自定义分区

    一 ,常规问题 : 1 ,表关联,数据过滤 : sql select stock.area,goods.smallLei,goods.typeColorId, weekofyear(to_date(s ...

  8. Spark分区器HashPartitioner和RangePartitioner代码详解

    转载: https://www.iteblog.com/archives/1522.html 在Spark中分区器直接决定了RDD中分区的个数:也决定了RDD中每条数据经过Shuffle过程属于哪个分 ...

  9. kafka自定义生产者分区器、自定义消费者分区器

    目录 1 默认分区 1.1 键key的作用 1.2 键的分区 2 生产者自定义分区 2.1 使用场景分析 2.2 自定义分区器要实现Partitioner接口 2.3 生产者使用分区器 3 消费者自定 ...

最新文章

  1. ThInkPHP验证码不显示,解决方法汇总
  2. Matter App提供了一个由BCH推动的长格式博客平
  3. 完全分布式部署Hadoop
  4. 暴力优化解法+哈希解法——2016年第七届蓝桥杯省赛b组第八题 四平方和
  5. Android使用Intent实现拨打电话的动作
  6. weblogic启动失败案例(root启动引起的权限问题)
  7. [vue]vue渲染模板时怎么保留模板中的HTML注释呢?
  8. SpringBoot使用RequestBodyAdvice进行统一参数处理
  9. 记录一次数据同步到数据仓库的架构与实践
  10. Emscripten 单词_分享15个英语单词记忆方法,简单实用,赶紧收藏吧!
  11. Linux之POSTFIX邮件服务
  12. 复选框式查询 例题租房子
  13. 关于求极限对几个问题的思考和总结
  14. 那些年我们一起错过赚钱时光 10年机会逐个数
  15. 回调地狱[Callback Hell]
  16. sqldbx连接Smartbi mysql知识库
  17. 2023联考管综论说文阅卷标准及提分计划:附2010-2022年真题
  18. 关于星巴克在故宫开店
  19. 华为荣耀平板5怎么样_荣耀平板5和华为m5青春版哪个好
  20. J2EE 框架结构及核心技术基础面面观

热门文章

  1. koa2 导出excel表格设置样式_一调整Excel表格的行列宽度,图片又得重新调整?点这个设置就行...
  2. python创建字符串_Python基础之字符串
  3. windows系统bat批处理 管理nginx启动 nginx脚本管理bat脚本管理生命周期windows一键nginx启动
  4. git学习 add - commit - init
  5. maxscale mysql5.7_Maxscale实现Mysql读写分离
  6. jni java参数签名,什么是“方法签名”参数调用使用JNI的Java方法?
  7. python界面颜色设置_pycharm修改界面主题颜色的方法
  8. 2019年安大计算机专业分数线,2019年安徽
  9. mysql 排序查询核心
  10. lambda函数if_现代 C++:Lambda 表达式