Spark自定义分区(Partitioner)
我们都知道Spark内部提供了HashPartitioner
和RangePartitioner
两种分区策略(这两种分区的代码解析可以参见:《Spark分区器HashPartitioner和RangePartitioner代码详解》),这两种分区策略在很多情况下都适合我们的场景。但是有些情况下,Spark内部不能符合咱们的需求,这时候我们就可以自定义分区策略。为此,Spark提供了相应的接口,我们只需要扩展Partitioner
抽象类,然后实现里面的三个方法:
package org.apache.spark
/**
* An object that defines how the elements in a key-value pair RDD are partitioned by key.
* Maps each key to a partition ID, from 0 to `numPartitions - 1`.
*/
abstract class Partitioner extends Serializable {
def numPartitions : Int
def getPartition(key : Any) : Int
}
|
def numPartitions: Int
:这个方法需要返回你想要创建分区的个数;
def getPartition(key: Any): Int
:这个函数需要对输入的key做计算,然后返回该key的分区ID,范围一定是0到numPartitions-1
;
equals()
:这个是Java标准的判断相等的函数,之所以要求用户实现这个函数是因为Spark内部会比较两个RDD的分区是否一样。
假如我们想把来自同一个域名的URL放到一台节点上,比如:https://www.iteblog.com
和https://www.iteblog.com/archives/1368
,如果你使用HashPartitioner
,这两个URL的Hash值可能不一样,这就使得这两个URL被放到不同的节点上。所以这种情况下我们就需要自定义我们的分区策略,可以如下实现:
package com.iteblog.utils
import org.apache.spark.Partitioner
/**
* User: 过往记忆
* Date: 2015-05-21
* Time: 下午23:34
* bolg: https://www.iteblog.com
* 本文地址:https://www.iteblog.com/archives/1368
* 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
* 过往记忆博客微信公共帐号:iteblog_hadoop
*/
class IteblogPartitioner(numParts : Int) extends Partitioner {
override def numPartitions : Int = numParts
override def getPartition(key : Any) : Int = {
val domain = new java.net.URL(key.toString).getHost()
val code = (domain.hashCode % numPartitions)
if (code < 0 ) {
code + numPartitions
} else {
code
}
}
override def equals(other : Any) : Boolean = other match {
case iteblog : IteblogPartitioner = >
iteblog.numPartitions == numPartitions
case _ = >
false
}
override def hashCode : Int = numPartitions
}
|
因为hashCode
值可能为负数,所以我们需要对他进行处理。然后我们就可以在partitionBy()
方法里面使用我们的分区:
iteblog.partitionBy( new IteblogPartitioner( 20 ))
|
类似的,在Java中定义自己的分区策略和Scala类似,只需要继承org.apache.spark.Partitioner
,并实现其中的方法即可。
在Python中,你不需要扩展Partitioner类,我们只需要对iteblog.partitionBy()
加上一个额外的hash函数,如下:
import urlparse
def iteblog_domain(url):
return hash (urlparse.urlparse(url).netloc)
iteblog.partitionBy( 20 , iteblog_domain)
|
Spark自定义分区(Partitioner)相关推荐
- Spark自定义分区器
spark目前支持两个分区器,分别是HashPartitioner和RangePartitioner. 均继承自Partitioner,有共同方法 - def numPartitions --分区器的 ...
- Spark数据分区(partitionBy分区、partitioner获取分区方式、自定义分区)
数据分区 partitionBy分区 在分布式程序中,通信的代价是很大的,因此控制数据分布以获得最少的网络传输可以极大地提升整体性能.和单节点的程序需要为记录集合选择合适的数据结构一样,Spark 程 ...
- 自定义实现spark的分区函数
有时自己的业务需要自己实现spark的分区函数 以下代码是实现一个自定义spark分区的demo 实现的功能是根据key值的最后一位数字,写到不同的文件 例如: 10写入到part-00000 11写 ...
- 21,spark sql 测试 : 1.4G 文件实战,测试耗时多少,先分区,再在分区内计算,用列内容分区( 这是一个很魔幻的问题 ),自定义分区
一 ,常规问题 : 1 ,表关联,数据过滤 : sql select stock.area,goods.smallLei,goods.typeColorId, weekofyear(to_date(s ...
- IDEA本地运行Spark项目[演示自定义分区器]并查看HDFS结果文件
文章目录 一.提出问题 二.解决问题 (一)添加IP到主机名的映射 (二)在本地准备Spark库文件 (三)在IDEA里创建Scala项目 (四)添加Spark库文件到项目 (五)创建自定义分区器 ( ...
- 【大数据开发】SparkCore——自定义排序、实现序列化、自定义分区器
文章目录 一.自定义排序四种方式.实现序列化 二.案例:自定义分区器 一.自定义排序四种方式.实现序列化 前面两种是样例类实现.普通类实现 第三种方式可以不实现序列化接口 用的最多的还是第四种方式,第 ...
- kafka模拟生产-消费者以及自定义分区
2019独角兽企业重金招聘Python工程师标准>>> 基本概念 kafka中的重要角色 broker:一台kafka服务器就是一个broker,一个集群可有多个broker,一 ...
- java kafka 分区_Java kafka如何实现自定义分区类和拦截器
生产者发送到对应的分区有以下几种方式: (1)指定了patition,则直接使用:(可以查阅对应的java api, 有多种参数) (2)未指定patition但指定key,通过对key的value进 ...
- kafka自定义分区实战
本文来说下kafka自定义分区相关的知识与内容,同时说下springboot整合kafka如何来实现自定义分区 文章目录 Kafka如何实现分区 Kafka集群是如何知道投递到哪个broker中 默认 ...
最新文章
- Bug改到怀疑人生…… | 每日趣闻
- Paddle内置的网络模型
- Spring和JSF集成:选择项目
- python有null吗_Python世界中, 有Java中那种 @NotNull 吗( JetBrains提供的 ) ?
- mysql sum 删除_如果mysql磁盘满了,会发生什么?还真被我遇到了~
- LINUX UBUNTU安装依赖库编译freeswitch
- 系统集成项目管理工程师和PMP®的对比
- 【MisakaHookFinder使用方法】关于如何提取一个文字游戏的文本钩子以供翻译的方法
- python数据笔记分析_python数据分析入门学习笔记
- CAD显示全屏控件(网页版)
- mininet-ovs转发行为与流表不对应
- 如何设置无线路由连接无线wifi
- 这种股权结构一定要远离!
- Hash签名算法入门
- 安全(Security)设计原则(1)
- 笔记 ngrok 内网穿透及其身份认证 authtoken 配置
- [附源码]java毕业设计宠物商店管理系统
- matplotlib更改窗口图标
- 暗黑2浴火重生zclient注册账号
- 管理类联考——写作——素材篇——论说文——写作素材01—志篇:理想•信念
热门文章
- 实现一个简单的银行转账操作
- AnalogClock和DigitalClock时间和日期控件
- C语言课后习题(49)
- java 建立一个graphics对象_java – 我应该显式处理Graphics对象吗?
- 数据库实验一(MySQL基本操作命令总结)
- 周四下午3小时,4个行业分享,尽在信创行业发展高端研讨会数据库专场
- 46个PPT下载丨QCon 2019年全球软件开发大会PPT
- 【假期重磅福利】更新三个Oracle系列课程,共153课时,最低免费获取
- 快讯:Oracle 19c 新特性及官方文档抢鲜下载
- MySQL 数据库“十宗罪”(十大经典错误案例)