spark更改分区_spark RDD分区是否可以指定分区
更多详细内容
数据分区:
在分布式集群里,网络通信的代价很大,减少网络传输可以极大提升性能。
mapreduce框架的性能开支主要在io和网络传输,io因为要大量读写文件,它是不可避免的,但是网络传输是可以避免的,把大文件压缩变小文件,从而减少网络传输,但是增加了cpu的计算负载。
spark里面io也是不可避免的,但是网络传输spark里面进行了优化:
spark把rdd进行分区(分片),放在集群上并行计算。
同一个rdd分片100个,10个节点,平均一个节点10个分区
当进行sum型的计算的时候,先进行每个分区的sum,然后把sum值shuffle传输到主程序进行全局sum,所以进行sum型计算对网络传输非常小。
但对于进行join型的计算的时候,需要把数据本身进行shuffle,网络开销很大。
spark是如何优化这个问题的呢?
spark把key-value rdd通过key的hashcode进行分区,而且保证相同的key存储在同一个节点上,这样对改rdd进行key聚合时,就不需要shuffle过程
我们进行mapreduce计算的时候为什么要尽兴shuffle?,就是说mapreduce里面网络传输主要在shuffle阶段,shuffle的根本原因是相同的key存在不同的节点上,按key进行聚合的时候不得不进行shuffle。shuffle是非常影响网络的,它要把所有的数据混在一起走网络,然后它才能把相同的key走到一起。要尽兴shuffle是存储决定的。
spark从这个教训中得到启发,spark会把key进行分区,也就是key的hashcode进行分区,相同的key,hashcode肯定是一样的,所以它进行分区的时候100t的数据分成10分,每部分10个t,它能确保相同的key肯定在一个分区里面,而且它能保证存储的时候相同的key能够存在同一个节点上。
比如一个rdd分成了100份,集群有10个节点,所以每个节点存10份,每一分称为每个分区,spark能保证相同的key存在同一个节点上,实际上相同的key存在同一个分区。
key的分布不均决定了有的分区大有的分区小。没法分区保证完全相等,但它会保证在一个接近的范围。
所以mapreduce里面做的某些工作里边,spark就不需要shuffle了,spark解决网络传输这块的根本原理就是这个。
进行join的时候是两个表,不可能把两个表都分区好,通常情况下是把用的频繁的大表事先进行分区,小表进行关联它的时候小表进行shuffle过程。
大表不需要shuffle。
模版是:
val userData = sc.sequenceFile[UserID,UserInfo]("hdfs://...")
.partitionBy(new HashPartition(100))//构造100个分区
.persist()
从分区中获益的操作:cogroup(), groupwith(),join(),leftOuterJoin(),rightOuterJoin(),groupByKey(),reduceByKey(),cobimeByKey(),lookup()
所有基于key的操作都会获益
对于诸如cogroup()和join()这样的二元操作,预先进行数据分区会让其中至少一个rdd(使用已知分区器的那个rdd)不发生数据shuffle,如果两个rdd使用同样的分区方式,并且它们还缓存在同样的机器上(比如一个rdd是通过mapvalues()从另一个rdd中创建出来的,这两个rdd就会拥有相同的key和分区方式),或者其中rdd还没有被计算出来,那么跨界点的shuffle(数据混洗)不会发生了。
mapreduce一般要求本地网卡达到20兆!即便进行了压缩!
代码:
[mw_shl_code=scala,true]import org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer.HashPartition
import org.apache.hadoop.mapred.lib
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.HashPartitioner
/**
* Created by zengxiaosen on 16/9/23.
*/
object PartitionVisitCount {
/*
大表小表关联
*/
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("useUDF").setMaster("local")
val ss = SparkSession.builder().config(sparkConf).getOrCreate()
val sc = ss.sparkContext
val fileRDD = sc.textFile("/opt/tarballs/spark_kafka/beifengspark/src/main/scala/2015082818")
.filter(line=>line.length>0)
.map{
line =>
val arr = line.split("\t")
val date = arr(17).substring(0,10)
val guid = arr(5)
val url = arr(1)
val uid = arr(18)
(uid,(guid,url)) //key-value:tuple2
}.partitionBy(new HashPartitioner(10)) //采用了hashcode分片方式,分成了10份,十个分区,每个分区10分
/*
相同的key在同一个分区,在进行任务调用时候,大表不需要任何shuffle
只需要shuffle小表
*/
.persist(StorageLevel.DISK_ONLY)
/*
parallelize有两个参数,第一个是他的list,第二个是分区数
分区数可以不给,不给的情况下默认就是它的核数
*/
//比如里面存的是我的用户id
val rdd = sc.parallelize(List(1,2,3,4,5,6,7),10)
.map(i => (i+"", i+"str"))
fileRDD.join(rdd).foreach(println)
/*
如果fileRDD后面还会关联好多个其他的rdd1,rdd2。。。rddn
就要先把大的fileRDD进行分区
这样优化了网络传输
*/
}
}[/mw_shl_code]
spark更改分区_spark RDD分区是否可以指定分区相关推荐
- spark应用程序转换_Spark—RDD编程常用转换算子代码实例
Spark-RDD编程常用转换算子代码实例 Spark rdd 常用 Transformation 实例: 1.def map[U: ClassTag](f: T => U): RDD[U] ...
- db2有主键时默认hash分区_Spark RDD依赖关系以及分区属性
RDD的依赖关系 Spark中使用DAG(有向无环图)来描述RDD之间的依赖关系,根据依赖关系的不同,划分为宽依赖和窄依赖 通过上图,可以很容易得出所谓宽依赖:多个子RDD的partition会依赖同 ...
- Spark算子:统计RDD分区中的元素及数量
Spark RDD是被分区的,在生成RDD时候,一般可以指定分区的数量,如果不指定分区数量,当RDD从集合创建时候,则默认为该程序所分配到的资源的CPU核数,如果是从HDFS文件创建,默认为文件的Bl ...
- 【kafka】kafka 指定分区消费 不会触发 reblance
文章目录 1.概述 2.验证 2.1 2个都是subscribeTopic 2.2 指定消费与全部消费 2.3 两个指定消费 2.4 2个都消费同样的分区呢? 1.概述 今天在博客:Kafka-消费, ...
- oracle insert指定分区,oracle partition 分区建立详解
Oracle9i通过引入列表分区(List Partition),使得当前共有4种分区数据 方法,具体列出如下: 字串9 第一种 范围分区 字串4 1 对表进行单列 范围分区: 字串6 这使最为常用也 ...
- Linux系统格式化分区用哪个命令,linux系统格式化分区的命令是什么
linux系统格式化分区的命令是什么 发布时间:2020-04-21 11:19:16 来源:亿速云 阅读:486 作者:小新 本文在介绍关于linux系统格式化分区的命令是什么,重点探讨了其具体步骤 ...
- spark sql 查看分区_Spark SQL解析查询parquet格式Hive表获取分区字段和查询条件
首先说一下,这里解决的问题应用场景: sparksql处理Hive表数据时,判断加载的是否是分区表,以及分区表的字段有哪些?再进一步限制查询分区表必须指定分区? 这里涉及到两种情况:select SQ ...
- spark中repartition, coalesce, partitionBy, repartitionAndSortWithinPartitions 四种重分区算子
美图欣赏: 一.背景 spark中一共有四种重分区算子: 1.repartition 2.coalesce 3.partitionBy 4.repartitionAndSortWithinPartit ...
- 什么是spark的惰性计算?有什么优势?_spark——spark中常说RDD,究竟RDD是什么?
本文始发于个人公众号:TechFlow,原创不易,求个关注 今天是spark专题第二篇文章,我们来看spark非常重要的一个概念--RDD. 在上一讲当中我们在本地安装好了spark,虽然我们只有lo ...
最新文章
- 马克思关于劳动的八大金句
- 【转载】Gradle学习 第十一章:使用Gradle命令行
- NTFS权限笔记 2017-12-4
- 【MySQL】MySQL STRAIGHT JOIN 使用案例以及简介
- R for data science之purrr包(上)
- 邪恶力量第一至九季/全集Supernatural迅雷下载
- 全套web前端课程思维导图+视频+源码 web高端课程 深入学习 624个视频教程
- 获取用户的中文姓名,手机号,邮箱,地址,年龄等随机信息,MD5加密等常用的工具。
- Google Earth导入GPS设备NMEA文本数据
- 计算机无法访问,您可能没有权限使用网络资源.请与这台服务器的管理员联系
- 如何优化网站才能让网站打开速度更快
- 微信小程序 09 前后端交互
- 主板cpu盖板怎么盖回去
- 25、Java面向对象——抽象类和抽象方法、接口
- C++ 实现哈夫曼树和哈夫曼编码
- Store Forwarding
- 不只是休闲:关于体感游戏的一些思考(一)--- 开篇和“随身”物件
- List集合(列表)
- Jdbc系列八:批量处理
- 使用计算机正确坐姿,一种计算机正确使用坐姿纠正装置的制作方法
热门文章
- 这里有一个3天的秘境邀请!
- 大量的 TIME_WAIT 状态 TCP 连接,对业务有什么影响?
- Spring Cloud Alibaba基础教程:Nacos配置的多环境管理
- 我说分布式事务之最大努力通知型事务
- ai为什么要栅格化_英语学习为什么不能“碎片化”?要想学好英语,系统化是关键...
- java 升级1.8_升级系统中的java到1.8版本详解
- 帝国cms会员充值交易推广分润系统的界面实现与开发记录
- 点云配准 PointNet + Concat + FC
- MultiBox_Loss bug改进
- `pydot` failed to call GraphViz.Please install GraphViz