更多详细内容

数据分区:

在分布式集群里,网络通信的代价很大,减少网络传输可以极大提升性能。

mapreduce框架的性能开支主要在io和网络传输,io因为要大量读写文件,它是不可避免的,但是网络传输是可以避免的,把大文件压缩变小文件,从而减少网络传输,但是增加了cpu的计算负载。

spark里面io也是不可避免的,但是网络传输spark里面进行了优化:

spark把rdd进行分区(分片),放在集群上并行计算。

同一个rdd分片100个,10个节点,平均一个节点10个分区

当进行sum型的计算的时候,先进行每个分区的sum,然后把sum值shuffle传输到主程序进行全局sum,所以进行sum型计算对网络传输非常小。

但对于进行join型的计算的时候,需要把数据本身进行shuffle,网络开销很大。

spark是如何优化这个问题的呢?

spark把key-value rdd通过key的hashcode进行分区,而且保证相同的key存储在同一个节点上,这样对改rdd进行key聚合时,就不需要shuffle过程

我们进行mapreduce计算的时候为什么要尽兴shuffle?,就是说mapreduce里面网络传输主要在shuffle阶段,shuffle的根本原因是相同的key存在不同的节点上,按key进行聚合的时候不得不进行shuffle。shuffle是非常影响网络的,它要把所有的数据混在一起走网络,然后它才能把相同的key走到一起。要尽兴shuffle是存储决定的。

spark从这个教训中得到启发,spark会把key进行分区,也就是key的hashcode进行分区,相同的key,hashcode肯定是一样的,所以它进行分区的时候100t的数据分成10分,每部分10个t,它能确保相同的key肯定在一个分区里面,而且它能保证存储的时候相同的key能够存在同一个节点上。

比如一个rdd分成了100份,集群有10个节点,所以每个节点存10份,每一分称为每个分区,spark能保证相同的key存在同一个节点上,实际上相同的key存在同一个分区。

key的分布不均决定了有的分区大有的分区小。没法分区保证完全相等,但它会保证在一个接近的范围。

所以mapreduce里面做的某些工作里边,spark就不需要shuffle了,spark解决网络传输这块的根本原理就是这个。

进行join的时候是两个表,不可能把两个表都分区好,通常情况下是把用的频繁的大表事先进行分区,小表进行关联它的时候小表进行shuffle过程。

大表不需要shuffle。

模版是:

val userData = sc.sequenceFile[UserID,UserInfo]("hdfs://...")

.partitionBy(new HashPartition(100))//构造100个分区

.persist()

从分区中获益的操作:cogroup(), groupwith(),join(),leftOuterJoin(),rightOuterJoin(),groupByKey(),reduceByKey(),cobimeByKey(),lookup()

所有基于key的操作都会获益

对于诸如cogroup()和join()这样的二元操作,预先进行数据分区会让其中至少一个rdd(使用已知分区器的那个rdd)不发生数据shuffle,如果两个rdd使用同样的分区方式,并且它们还缓存在同样的机器上(比如一个rdd是通过mapvalues()从另一个rdd中创建出来的,这两个rdd就会拥有相同的key和分区方式),或者其中rdd还没有被计算出来,那么跨界点的shuffle(数据混洗)不会发生了。

mapreduce一般要求本地网卡达到20兆!即便进行了压缩!

代码:

[mw_shl_code=scala,true]import org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer.HashPartition

import org.apache.hadoop.mapred.lib

import org.apache.spark.SparkConf

import org.apache.spark.sql.SparkSession

import org.apache.spark.storage.StorageLevel

import org.apache.spark.HashPartitioner

/**

* Created by zengxiaosen on 16/9/23.

*/

object PartitionVisitCount {

/*

大表小表关联

*/

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setAppName("useUDF").setMaster("local")

val ss = SparkSession.builder().config(sparkConf).getOrCreate()

val sc = ss.sparkContext

val fileRDD = sc.textFile("/opt/tarballs/spark_kafka/beifengspark/src/main/scala/2015082818")

.filter(line=>line.length>0)

.map{

line =>

val arr = line.split("\t")

val date = arr(17).substring(0,10)

val guid = arr(5)

val url = arr(1)

val uid = arr(18)

(uid,(guid,url)) //key-value:tuple2

}.partitionBy(new HashPartitioner(10)) //采用了hashcode分片方式,分成了10份,十个分区,每个分区10分

/*

相同的key在同一个分区,在进行任务调用时候,大表不需要任何shuffle

只需要shuffle小表

*/

.persist(StorageLevel.DISK_ONLY)

/*

parallelize有两个参数,第一个是他的list,第二个是分区数

分区数可以不给,不给的情况下默认就是它的核数

*/

//比如里面存的是我的用户id

val rdd = sc.parallelize(List(1,2,3,4,5,6,7),10)

.map(i => (i+"", i+"str"))

fileRDD.join(rdd).foreach(println)

/*

如果fileRDD后面还会关联好多个其他的rdd1,rdd2。。。rddn

就要先把大的fileRDD进行分区

这样优化了网络传输

*/

}

}[/mw_shl_code]

spark更改分区_spark RDD分区是否可以指定分区相关推荐

  1. spark应用程序转换_Spark—RDD编程常用转换算子代码实例

    Spark-RDD编程常用转换算子代码实例 Spark rdd 常用 Transformation 实例: 1.def map[U: ClassTag](f: T => U): RDD[U]  ...

  2. db2有主键时默认hash分区_Spark RDD依赖关系以及分区属性

    RDD的依赖关系 Spark中使用DAG(有向无环图)来描述RDD之间的依赖关系,根据依赖关系的不同,划分为宽依赖和窄依赖 通过上图,可以很容易得出所谓宽依赖:多个子RDD的partition会依赖同 ...

  3. Spark算子:统计RDD分区中的元素及数量

    Spark RDD是被分区的,在生成RDD时候,一般可以指定分区的数量,如果不指定分区数量,当RDD从集合创建时候,则默认为该程序所分配到的资源的CPU核数,如果是从HDFS文件创建,默认为文件的Bl ...

  4. 【kafka】kafka 指定分区消费 不会触发 reblance

    文章目录 1.概述 2.验证 2.1 2个都是subscribeTopic 2.2 指定消费与全部消费 2.3 两个指定消费 2.4 2个都消费同样的分区呢? 1.概述 今天在博客:Kafka-消费, ...

  5. oracle insert指定分区,oracle partition 分区建立详解

    Oracle9i通过引入列表分区(List Partition),使得当前共有4种分区数据 方法,具体列出如下: 字串9 第一种 范围分区 字串4 1 对表进行单列 范围分区: 字串6 这使最为常用也 ...

  6. Linux系统格式化分区用哪个命令,linux系统格式化分区的命令是什么

    linux系统格式化分区的命令是什么 发布时间:2020-04-21 11:19:16 来源:亿速云 阅读:486 作者:小新 本文在介绍关于linux系统格式化分区的命令是什么,重点探讨了其具体步骤 ...

  7. spark sql 查看分区_Spark SQL解析查询parquet格式Hive表获取分区字段和查询条件

    首先说一下,这里解决的问题应用场景: sparksql处理Hive表数据时,判断加载的是否是分区表,以及分区表的字段有哪些?再进一步限制查询分区表必须指定分区? 这里涉及到两种情况:select SQ ...

  8. spark中repartition, coalesce, partitionBy, repartitionAndSortWithinPartitions 四种重分区算子

    美图欣赏: 一.背景 spark中一共有四种重分区算子: 1.repartition 2.coalesce 3.partitionBy 4.repartitionAndSortWithinPartit ...

  9. 什么是spark的惰性计算?有什么优势?_spark——spark中常说RDD,究竟RDD是什么?

    本文始发于个人公众号:TechFlow,原创不易,求个关注 今天是spark专题第二篇文章,我们来看spark非常重要的一个概念--RDD. 在上一讲当中我们在本地安装好了spark,虽然我们只有lo ...

最新文章

  1. 马克思关于劳动的八大金句
  2. 【转载】Gradle学习 第十一章:使用Gradle命令行
  3. NTFS权限笔记 2017-12-4
  4. 【MySQL】MySQL STRAIGHT JOIN 使用案例以及简介
  5. R for data science之purrr包(上)
  6. 邪恶力量第一至九季/全集Supernatural迅雷下载
  7. 全套web前端课程思维导图+视频+源码 web高端课程 深入学习 624个视频教程
  8. 获取用户的中文姓名,手机号,邮箱,地址,年龄等随机信息,MD5加密等常用的工具。
  9. Google Earth导入GPS设备NMEA文本数据
  10. 计算机无法访问,您可能没有权限使用网络资源.请与这台服务器的管理员联系
  11. 如何优化网站才能让网站打开速度更快
  12. 微信小程序 09 前后端交互
  13. 主板cpu盖板怎么盖回去
  14. 25、Java面向对象——抽象类和抽象方法、接口
  15. C++ 实现哈夫曼树和哈夫曼编码
  16. Store Forwarding
  17. 不只是休闲:关于体感游戏的一些思考(一)--- 开篇和“随身”物件
  18. List集合(列表)
  19. Jdbc系列八:批量处理
  20. 使用计算机正确坐姿,一种计算机正确使用坐姿纠正装置的制作方法

热门文章

  1. 这里有一个3天的秘境邀请!
  2. 大量的 TIME_WAIT 状态 TCP 连接,对业务有什么影响?
  3. Spring Cloud Alibaba基础教程:Nacos配置的多环境管理
  4. 我说分布式事务之最大努力通知型事务
  5. ai为什么要栅格化_英语学习为什么不能“碎片化”?要想学好英语,系统化是关键...
  6. java 升级1.8_升级系统中的java到1.8版本详解
  7. 帝国cms会员充值交易推广分润系统的界面实现与开发记录
  8. 点云配准 PointNet + Concat + FC
  9. MultiBox_Loss bug改进
  10. `pydot` failed to call GraphViz.Please install GraphViz