我们都知道Spark内部提供了HashPartitionerRangePartitioner两种分区策略(这两种分区的代码解析可以参见:《Spark分区器HashPartitioner和RangePartitioner代码详解》),这两种分区策略在很多情况下都适合我们的场景。但是有些情况下,Spark内部不能符合咱们的需求,这时候我们就可以自定义分区策略。为此,Spark提供了相应的接口,我们只需要扩展Partitioner抽象类,然后实现里面的三个方法:

package org.apache.spark
/**
 * An object that defines how the elements in a key-value pair RDD are partitioned by key.
 * Maps each key to a partition ID, from 0 to `numPartitions - 1`.
 */
abstract class Partitioner extends Serializable {
  def numPartitions: Int
  def getPartition(key: Any): Int
}

  def numPartitions: Int:这个方法需要返回你想要创建分区的个数;
  def getPartition(key: Any): Int:这个函数需要对输入的key做计算,然后返回该key的分区ID,范围一定是0到numPartitions-1
  equals():这个是Java标准的判断相等的函数,之所以要求用户实现这个函数是因为Spark内部会比较两个RDD的分区是否一样。

  假如我们想把来自同一个域名的URL放到一台节点上,比如:https://www.iteblog.comhttps://www.iteblog.com/archives/1368,如果你使用HashPartitioner,这两个URL的Hash值可能不一样,这就使得这两个URL被放到不同的节点上。所以这种情况下我们就需要自定义我们的分区策略,可以如下实现:

package com.iteblog.utils
import org.apache.spark.Partitioner
/**
 * User: 过往记忆
 * Date: 2015-05-21
 * Time: 下午23:34
 * bolg: https://www.iteblog.com
 * 本文地址:https://www.iteblog.com/archives/1368
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
 */
class IteblogPartitioner(numParts: Int) extends Partitioner {
  override def numPartitions: Int = numParts
  override def getPartition(key: Any): Int = {
    val domain = new java.net.URL(key.toString).getHost()
    val code = (domain.hashCode % numPartitions)
    if (code < 0) {
      code + numPartitions
    } else {
      code
    }
  }
  override def equals(other: Any): Boolean = other match {
    case iteblog: IteblogPartitioner =>
      iteblog.numPartitions == numPartitions
    case _ =>
      false
  }
  override def hashCode: Int = numPartitions
}

因为hashCode值可能为负数,所以我们需要对他进行处理。然后我们就可以在partitionBy()方法里面使用我们的分区:

iteblog.partitionBy(new IteblogPartitioner(20))

  类似的,在Java中定义自己的分区策略和Scala类似,只需要继承org.apache.spark.Partitioner,并实现其中的方法即可。

  在Python中,你不需要扩展Partitioner类,我们只需要对iteblog.partitionBy()加上一个额外的hash函数,如下:

import urlparse
def iteblog_domain(url):
  return hash(urlparse.urlparse(url).netloc)
iteblog.partitionBy(20, iteblog_domain)

本文转自https://www.iteblog.com/archives/1368.html,所有权力归原作者所有,侵权请联系删除

Spark自定义分区(Partitioner)相关推荐

  1. Spark自定义分区器

    spark目前支持两个分区器,分别是HashPartitioner和RangePartitioner. 均继承自Partitioner,有共同方法 - def numPartitions --分区器的 ...

  2. Spark数据分区(partitionBy分区、partitioner获取分区方式、自定义分区)

    数据分区 partitionBy分区 在分布式程序中,通信的代价是很大的,因此控制数据分布以获得最少的网络传输可以极大地提升整体性能.和单节点的程序需要为记录集合选择合适的数据结构一样,Spark 程 ...

  3. 自定义实现spark的分区函数

    有时自己的业务需要自己实现spark的分区函数 以下代码是实现一个自定义spark分区的demo 实现的功能是根据key值的最后一位数字,写到不同的文件 例如: 10写入到part-00000 11写 ...

  4. 21,spark sql 测试 : 1.4G 文件实战,测试耗时多少,先分区,再在分区内计算,用列内容分区( 这是一个很魔幻的问题 ),自定义分区

    一 ,常规问题 : 1 ,表关联,数据过滤 : sql select stock.area,goods.smallLei,goods.typeColorId, weekofyear(to_date(s ...

  5. IDEA本地运行Spark项目[演示自定义分区器]并查看HDFS结果文件

    文章目录 一.提出问题 二.解决问题 (一)添加IP到主机名的映射 (二)在本地准备Spark库文件 (三)在IDEA里创建Scala项目 (四)添加Spark库文件到项目 (五)创建自定义分区器 ( ...

  6. 【大数据开发】SparkCore——自定义排序、实现序列化、自定义分区器

    文章目录 一.自定义排序四种方式.实现序列化 二.案例:自定义分区器 一.自定义排序四种方式.实现序列化 前面两种是样例类实现.普通类实现 第三种方式可以不实现序列化接口 用的最多的还是第四种方式,第 ...

  7. kafka模拟生产-消费者以及自定义分区

    2019独角兽企业重金招聘Python工程师标准>>> 基本概念 kafka中的重要角色   broker:一台kafka服务器就是一个broker,一个集群可有多个broker,一 ...

  8. java kafka 分区_Java kafka如何实现自定义分区类和拦截器

    生产者发送到对应的分区有以下几种方式: (1)指定了patition,则直接使用:(可以查阅对应的java api, 有多种参数) (2)未指定patition但指定key,通过对key的value进 ...

  9. kafka自定义分区实战

    本文来说下kafka自定义分区相关的知识与内容,同时说下springboot整合kafka如何来实现自定义分区 文章目录 Kafka如何实现分区 Kafka集群是如何知道投递到哪个broker中 默认 ...

最新文章

  1. Bug改到怀疑人生…… | 每日趣闻
  2. Paddle内置的网络模型
  3. Spring和JSF集成:选择项目
  4. python有null吗_Python世界中, 有Java中那种 @NotNull 吗( JetBrains提供的 ) ?
  5. mysql sum 删除_如果mysql磁盘满了,会发生什么?还真被我遇到了~
  6. LINUX UBUNTU安装依赖库编译freeswitch
  7. 系统集成项目管理工程师和PMP®的对比
  8. 【MisakaHookFinder使用方法】关于如何提取一个文字游戏的文本钩子以供翻译的方法
  9. python数据笔记分析_python数据分析入门学习笔记
  10. CAD显示全屏控件(网页版)
  11. mininet-ovs转发行为与流表不对应
  12. 如何设置无线路由连接无线wifi
  13. 这种股权结构一定要远离!
  14. Hash签名算法入门
  15. 安全(Security)设计原则(1)
  16. 笔记 ngrok 内网穿透及其身份认证 authtoken 配置
  17. [附源码]java毕业设计宠物商店管理系统
  18. matplotlib更改窗口图标
  19. 暗黑2浴火重生zclient注册账号
  20. 管理类联考——写作——素材篇——论说文——写作素材01—志篇:理想•信念

热门文章

  1. 实现一个简单的银行转账操作
  2. AnalogClock和DigitalClock时间和日期控件
  3. C语言课后习题(49)
  4. java 建立一个graphics对象_java – 我应该显式处理Graphics对象吗?
  5. 数据库实验一(MySQL基本操作命令总结)
  6. 周四下午3小时,4个行业分享,尽在信创行业发展高端研讨会数据库专场
  7. 46个PPT下载丨QCon 2019年全球软件开发大会PPT
  8. 【假期重磅福利】更新三个Oracle系列课程,共153课时,最低免费获取
  9. 快讯:Oracle 19c 新特性及官方文档抢鲜下载
  10. MySQL 数据库“十宗罪”(十大经典错误案例)