Spark中分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle过程属于哪个分区和Reduce的个数
注意:
(1)只有Key-Value类型的RDD才有分区的,非Key-Value类型的RDD分区的值是None
(2)每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的。
参考:http://blog.csdn.net/high2011/article/details/68491115

package com.ljt.spark01.weblog

import java.net.URL

import org.apache.spark.HashPartitioner
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

/**
* 自定义分区partitioner实现数据分区存储
*/
object UrlCountPartition {

def main(args: Array[String]): Unit = {
val arr_course = Array(“java.itcast.cn”, “php.itcast.cn”, “net.itcast.cn”)
val conf = new SparkConf().setAppName(“AdvUrlCount”)
.setMaster(“local[2]”)
val sc = new SparkContext(conf)

//将数据切分为元组(URL,1)存放在RDDl
val RDD1 = sc.textFile("data/usercount/IT_education.log").map { x =>val f = x.split("\t")//去掉时间,每出现一次URL,记为一个元组(url,1)(f(1), 1)
}
//对相同的key的每个元组的值进行自加
//(http://php.itcast.cn/php/course.shtml,459)
val rdd_urlCount = RDD1.reduceByKey(_ + _)//获取url的前缀Host做为课程标识
//(php.itcast.cn,http://php.itcast.cn/php/course.shtml,459)
val rdd_urlHost = rdd_urlCount.map(f => {val url = f._1val countUrl = f._2val host = new URL(url).getHost//为了方便按照分区内部排序需要使用K-V,元组(host, (url, countUrl))
}).cache() //cache会将数据缓存到内存当中,cache是一个Transformation,lazy
//url去重,得到所有host课程种类
val ints = rdd_urlHost.map(_._1).distinct().collect()
//实例化分区
val hostPartitioner = new HostPartition(ints)
//每个分区内部排序,取出前3名
val rdd_Partitioners = rdd_urlHost.partitionBy(hostPartitioner).mapPartitions(it => {it.toList.sortBy(_._2._2).reverse.take(3).iterator})rdd_Partitioners.saveAsTextFile("data/out/out_partitioner")
/*** ArrayBuffer((net.itcast.cn,(http://net.itcast.cn/net/course.shtml,521)), (net.itcast.cn,(http://net.itcast.cn/net/video.shtml,521)), (net.itcast.cn,(http://net.itcast.cn/net/teacher.shtml,512)), (java.itcast.cn,(http://java.itcast.cn/java/course/cloud.shtml,1028)), (java.itcast.cn,(http://java.itcast.cn/java/course/javaee.shtml,1000)), (java.itcast.cn,(http://java.itcast.cn/java/course/base.shtml,543)), (php.itcast.cn,(http://php.itcast.cn/php/video.shtml,490)), (php.itcast.cn,(http://php.itcast.cn/php/teacher.shtml,464)), (php.itcast.cn,(http://php.itcast.cn/php/course.shtml,459)))*/
println(rdd_Partitioners.collect().toBuffer)
sc.stop()

}

}

 package com.ljt.spark01.weblogimport org.apache.spark.Partitioner
import scala.collection.mutable.HashMap/** * 重写partition分区,按规则存储分区数据*/
class HostPartition(ins: Array[String]) extends Partitioner {val parMap = new HashMap[String, Int]()var count = 0for (i <- ins) {parMap += (i -> count)count += 1}override def numPartitions: Int = {ins.length}def getPartition(key: Any): Int = {parMap.getOrElse(key.toString(), 0)}}

自定义分区partitioner实现数据分区存储相关推荐

  1. 数据密集型应用系统设计--数据分区

    数据分区与数据复制 分区的目的一般是提高可扩展性.容错性和集群吞吐,同一个分区会在多个节点中都有副本. 容错性:一个节点挂掉,则这个节点上的分区,在其他节点上都有副本,可以查询其他的节点 可扩展性:新 ...

  2. 初探redis:redis集群的数据分区和故障转移

    redis 集群 根据之前的诸多分析,我们知道单机的redis有很多的局限性,所以可以使用多台机器来实现分区存储,构建更大的数据库,满足更高的业务需求. 之前我们实现的主从复制,可以实现一主多从的架构 ...

  3. DDIA读书笔记 6 数据分区

    6 数据分区 6.1 数据分区与数据复制 面对海量数据集或者非常高的查询压力,只使用复制技术还不够,需要将数据分区,也称为分片 分区通常和复制结合使用 6.2 键值数据的分区 目标:将数据和查询负载均 ...

  4. Spark数据分区(partitionBy分区、partitioner获取分区方式、自定义分区)

    数据分区 partitionBy分区 在分布式程序中,通信的代价是很大的,因此控制数据分布以获得最少的网络传输可以极大地提升整体性能.和单节点的程序需要为记录集合选择合适的数据结构一样,Spark 程 ...

  5. 07_clickhouse、自定义分区及底层存储合并机制、自定义分区键、分区目录的命名规则、分区目录的合并过程、分区目录的合并过程、分区表达式指定、分区案例

    4.自定义分区及底层存储合并机制 4.1.自定义分区键 4.2.分区目录的命名规则 4.3.分区目录的合并过程 4.4.分区目录的合并过程 4.5.分区表达式指定 4.6.分区案例 4.自定义分区及底 ...

  6. pg 事务 存储过程_PgpoolII实现数据分区存储及性能分析

    李琳琪-中国PG分会志愿者 目录 1 概述 2 配置环境 3 规则定义 4 数据生成 5 分发性能测试 6 总结 概述 Pgpool-II是一个位于PostgreSQL服务器和PostgreSQL数据 ...

  7. spark分区连接mysql_Spark数据存储和分区操作

    Spark数据读取 对于存储在本地文件系统或分布式文件系统(HDFS.Amazon S3)中的数据,Spark可以访问很多种不同的文件格式,比如文本文件.JSON.SequenceFile Spark ...

  8. linux 挂载有数据硬盘分区,linux下磁盘分区、挂载知多少

    0x01 Linux 分区简介 主分区 vs 扩展分区 硬盘分区表中最多能存储四个分区,但我们实际使用时一般只分为两个分区,一个是主分区(Primary Partion)一个是扩展分区(extende ...

  9. tablesample oracle,Hive DDL 内部表外部表 分区 分桶 行格式 存储文件格式 概述

    创建数据库 CREATE DATABASE [IF NOT EXISTS] database_name [COMMENT database_comment] [LOCATION hdfs_path] ...

  10. flink keyby、shuffle、 rebalance、rescale、 broadcast、global、自定义分区算子以及各分区器源码

    文章目录 前言 1. 随机分区 2. 轮询分区 3. 重缩放分区 4. 广播 5. 全局分区 6. 自定义分区 前言   flink中keyBy是一种按照键的哈希值来进行重新分区的操作,至于分区是否均 ...

最新文章

  1. IntelliJ IDEA上操作GitHub
  2. 金陵科技学院c语言测试,金陵科技学院C语言实验册.doc
  3. Struts2的类型转换(下)
  4. concat函数java_MySql中CONCAT(str1,str2,...)函数
  5. Java并发编程:Synchronized底层优化(偏向锁、轻量级锁)
  6. linux查看镜像的详细信息,docker inspect命令查看镜像详细信息
  7. 服务器上有涉密文件,涉密文件保密管理规定
  8. flex结合asp.net上传深入详细解说(转载)
  9. 全球软件巨头 Software AG 遭勒索攻击
  10. 阿里云原生专家复礼:多活容灾建设思路与经验分享
  11. Chrome浏览器升级后提示崩溃
  12. Dev-C++ 提示源文件未编译,原因及解决办法
  13. 瀚高数据库在Linux上安装,瀚高数据库和postgresql并存,安装瀚高数据库问题的一种解决方案(APP)...
  14. 《好吗好的》--大冰
  15. Android UI 设计规范,Android高分面试指南
  16. 谷歌云|机密 GKE 节点可在计算优化的 C2D 虚拟机上使用
  17. 打印机 正在删除正在打印怎么也删除不了
  18. 关于MIT6.828_HW9_barriers xv6 homework9的一些问题
  19. 14、Spring Clou14——路由配置细节
  20. 分享Html模板5合一模板---50电影模板、56个游、86个体育项目、95个音乐网站、116个时尚

热门文章

  1. 2d游戏动作软件支持c语言,C语言编写简单2D游戏
  2. 查找算法-------插值查找
  3. Flutter之BuilderContext和Widget关系浅析
  4. java多数据源事务处理_springboot整合多数据源解决分布式事务
  5. docker多个mysql实例_Docker创建运行多个mysql容器的方法示例
  6. PHP-----strpos() 函数的用法
  7. linux 终端 收取邮件,linux mail 命令 (收发邮件)
  8. mysql 1236错误_MySQL主主同步环境出现1236错误
  9. mysql时间设计模式_java 23种设计模式及具体例子 收藏有时间慢慢看
  10. html添加兄弟元素,jquery插入兄弟节点的操作方法