自定义分区partitioner实现数据分区存储
Spark中分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle过程属于哪个分区和Reduce的个数
注意:
(1)只有Key-Value类型的RDD才有分区的,非Key-Value类型的RDD分区的值是None
(2)每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的。
参考:http://blog.csdn.net/high2011/article/details/68491115
package com.ljt.spark01.weblog
import java.net.URL
import org.apache.spark.HashPartitioner
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
/**
* 自定义分区partitioner实现数据分区存储
*/
object UrlCountPartition {
def main(args: Array[String]): Unit = {
val arr_course = Array(“java.itcast.cn”, “php.itcast.cn”, “net.itcast.cn”)
val conf = new SparkConf().setAppName(“AdvUrlCount”)
.setMaster(“local[2]”)
val sc = new SparkContext(conf)
//将数据切分为元组(URL,1)存放在RDDl
val RDD1 = sc.textFile("data/usercount/IT_education.log").map { x =>val f = x.split("\t")//去掉时间,每出现一次URL,记为一个元组(url,1)(f(1), 1)
}
//对相同的key的每个元组的值进行自加
//(http://php.itcast.cn/php/course.shtml,459)
val rdd_urlCount = RDD1.reduceByKey(_ + _)//获取url的前缀Host做为课程标识
//(php.itcast.cn,http://php.itcast.cn/php/course.shtml,459)
val rdd_urlHost = rdd_urlCount.map(f => {val url = f._1val countUrl = f._2val host = new URL(url).getHost//为了方便按照分区内部排序需要使用K-V,元组(host, (url, countUrl))
}).cache() //cache会将数据缓存到内存当中,cache是一个Transformation,lazy
//url去重,得到所有host课程种类
val ints = rdd_urlHost.map(_._1).distinct().collect()
//实例化分区
val hostPartitioner = new HostPartition(ints)
//每个分区内部排序,取出前3名
val rdd_Partitioners = rdd_urlHost.partitionBy(hostPartitioner).mapPartitions(it => {it.toList.sortBy(_._2._2).reverse.take(3).iterator})rdd_Partitioners.saveAsTextFile("data/out/out_partitioner")
/*** ArrayBuffer((net.itcast.cn,(http://net.itcast.cn/net/course.shtml,521)), (net.itcast.cn,(http://net.itcast.cn/net/video.shtml,521)), (net.itcast.cn,(http://net.itcast.cn/net/teacher.shtml,512)), (java.itcast.cn,(http://java.itcast.cn/java/course/cloud.shtml,1028)), (java.itcast.cn,(http://java.itcast.cn/java/course/javaee.shtml,1000)), (java.itcast.cn,(http://java.itcast.cn/java/course/base.shtml,543)), (php.itcast.cn,(http://php.itcast.cn/php/video.shtml,490)), (php.itcast.cn,(http://php.itcast.cn/php/teacher.shtml,464)), (php.itcast.cn,(http://php.itcast.cn/php/course.shtml,459)))*/
println(rdd_Partitioners.collect().toBuffer)
sc.stop()
}
}
package com.ljt.spark01.weblogimport org.apache.spark.Partitioner
import scala.collection.mutable.HashMap/** * 重写partition分区,按规则存储分区数据*/
class HostPartition(ins: Array[String]) extends Partitioner {val parMap = new HashMap[String, Int]()var count = 0for (i <- ins) {parMap += (i -> count)count += 1}override def numPartitions: Int = {ins.length}def getPartition(key: Any): Int = {parMap.getOrElse(key.toString(), 0)}}
自定义分区partitioner实现数据分区存储相关推荐
- 数据密集型应用系统设计--数据分区
数据分区与数据复制 分区的目的一般是提高可扩展性.容错性和集群吞吐,同一个分区会在多个节点中都有副本. 容错性:一个节点挂掉,则这个节点上的分区,在其他节点上都有副本,可以查询其他的节点 可扩展性:新 ...
- 初探redis:redis集群的数据分区和故障转移
redis 集群 根据之前的诸多分析,我们知道单机的redis有很多的局限性,所以可以使用多台机器来实现分区存储,构建更大的数据库,满足更高的业务需求. 之前我们实现的主从复制,可以实现一主多从的架构 ...
- DDIA读书笔记 6 数据分区
6 数据分区 6.1 数据分区与数据复制 面对海量数据集或者非常高的查询压力,只使用复制技术还不够,需要将数据分区,也称为分片 分区通常和复制结合使用 6.2 键值数据的分区 目标:将数据和查询负载均 ...
- Spark数据分区(partitionBy分区、partitioner获取分区方式、自定义分区)
数据分区 partitionBy分区 在分布式程序中,通信的代价是很大的,因此控制数据分布以获得最少的网络传输可以极大地提升整体性能.和单节点的程序需要为记录集合选择合适的数据结构一样,Spark 程 ...
- 07_clickhouse、自定义分区及底层存储合并机制、自定义分区键、分区目录的命名规则、分区目录的合并过程、分区目录的合并过程、分区表达式指定、分区案例
4.自定义分区及底层存储合并机制 4.1.自定义分区键 4.2.分区目录的命名规则 4.3.分区目录的合并过程 4.4.分区目录的合并过程 4.5.分区表达式指定 4.6.分区案例 4.自定义分区及底 ...
- pg 事务 存储过程_PgpoolII实现数据分区存储及性能分析
李琳琪-中国PG分会志愿者 目录 1 概述 2 配置环境 3 规则定义 4 数据生成 5 分发性能测试 6 总结 概述 Pgpool-II是一个位于PostgreSQL服务器和PostgreSQL数据 ...
- spark分区连接mysql_Spark数据存储和分区操作
Spark数据读取 对于存储在本地文件系统或分布式文件系统(HDFS.Amazon S3)中的数据,Spark可以访问很多种不同的文件格式,比如文本文件.JSON.SequenceFile Spark ...
- linux 挂载有数据硬盘分区,linux下磁盘分区、挂载知多少
0x01 Linux 分区简介 主分区 vs 扩展分区 硬盘分区表中最多能存储四个分区,但我们实际使用时一般只分为两个分区,一个是主分区(Primary Partion)一个是扩展分区(extende ...
- tablesample oracle,Hive DDL 内部表外部表 分区 分桶 行格式 存储文件格式 概述
创建数据库 CREATE DATABASE [IF NOT EXISTS] database_name [COMMENT database_comment] [LOCATION hdfs_path] ...
- flink keyby、shuffle、 rebalance、rescale、 broadcast、global、自定义分区算子以及各分区器源码
文章目录 前言 1. 随机分区 2. 轮询分区 3. 重缩放分区 4. 广播 5. 全局分区 6. 自定义分区 前言 flink中keyBy是一种按照键的哈希值来进行重新分区的操作,至于分区是否均 ...
最新文章
- IntelliJ IDEA上操作GitHub
- 金陵科技学院c语言测试,金陵科技学院C语言实验册.doc
- Struts2的类型转换(下)
- concat函数java_MySql中CONCAT(str1,str2,...)函数
- Java并发编程:Synchronized底层优化(偏向锁、轻量级锁)
- linux查看镜像的详细信息,docker inspect命令查看镜像详细信息
- 服务器上有涉密文件,涉密文件保密管理规定
- flex结合asp.net上传深入详细解说(转载)
- 全球软件巨头 Software AG 遭勒索攻击
- 阿里云原生专家复礼:多活容灾建设思路与经验分享
- Chrome浏览器升级后提示崩溃
- Dev-C++ 提示源文件未编译,原因及解决办法
- 瀚高数据库在Linux上安装,瀚高数据库和postgresql并存,安装瀚高数据库问题的一种解决方案(APP)...
- 《好吗好的》--大冰
- Android UI 设计规范,Android高分面试指南
- 谷歌云|机密 GKE 节点可在计算优化的 C2D 虚拟机上使用
- 打印机 正在删除正在打印怎么也删除不了
- 关于MIT6.828_HW9_barriers xv6 homework9的一些问题
- 14、Spring Clou14——路由配置细节
- 分享Html模板5合一模板---50电影模板、56个游、86个体育项目、95个音乐网站、116个时尚
热门文章
- 2d游戏动作软件支持c语言,C语言编写简单2D游戏
- 查找算法-------插值查找
- Flutter之BuilderContext和Widget关系浅析
- java多数据源事务处理_springboot整合多数据源解决分布式事务
- docker多个mysql实例_Docker创建运行多个mysql容器的方法示例
- PHP-----strpos() 函数的用法
- linux 终端 收取邮件,linux mail 命令 (收发邮件)
- mysql 1236错误_MySQL主主同步环境出现1236错误
- mysql时间设计模式_java 23种设计模式及具体例子 收藏有时间慢慢看
- html添加兄弟元素,jquery插入兄弟节点的操作方法