通过分区(Partitioning)提高Spark的运行性能
在Sortable公司,很多数据处理的工作都是使用Spark完成的。在使用Spark的过程中他们发现了一个能够提高Sparkjob性能的一个技巧,也就是修改数据的分区数,本文将举个例子并详细地介绍如何做到的。
查找质数
比如我们需要从2到2000000之间寻找所有的质数。我们很自然地会想到先找到所有的非质数,剩下的所有数字就是我们要找的质数。
我们首先遍历2到2000000之间的每个数,然后找到这些数的所有小于或等于2000000的倍数,在计算的结果中可能会有许多重复的数据(比如6同时是2和3的倍数)但是这并没有啥影响。
我们在Spark shell中计算:
Welcome to
____ __
/ __ / __ ___ _____ / / __
_ \ \/ _ \/ _ `/ __ / ' _ /
/ ___ / . __ /\ _ , _ / _ / / _ /\ _ \ version 1.6 . 1
/ _ /
Using Scala version 2.10 . 5 (Java HotSpot(TM) 64 -Bit Server VM, Java 1.7 . 0 _ 45 )
Type in expressions to have them evaluated.
Type : help for more information.
Spark context available as sc.
SQL context available as sqlContext.
scala> val n = 2000000
n : Int = 2000000
scala> val composite = sc.parallelize( 2 to n, 8 ).map(x = > (x, ( 2 to (n / x)))).flatMap(kv = > kv. _ 2 .map( _ * kv. _ 1 ))
composite : org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[ 2 ] at flatMap at <console> : 29
scala>
scala> val prime = sc.parallelize( 2 to n, 8 ).subtract(composite)
prime : org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[ 7 ] at subtract at <console> : 31
scala> prime.collect()
res 0 : Array[Int] = Array( 563249 , 17 , 281609 , 840761 , 1126513 , 1958993 , 840713 , 1959017 , 41 , 281641 , 1681513 , 1126441 , 73 , 1126457 , 89 , 840817 , 97 , 1408009 , 113 , 137 , 1408241 , 563377 , 1126649 , 281737 , 281777 , 840841 , 1408217 , 1681649 , 281761 , 1408201 , 1959161 , 1408177 , 840929 , 563449 , 1126561 , 193 , 1126577 , 1126537 , 1959073 , 563417 , 233 , 281849 , 1126553 , 563401 , 281833 , 241 , 563489 , 281 , 281857 , 257 , 1959241 , 313 , 841081 , 337 , 1408289 , 563561 , 281921 , 353 , 1681721 , 409 , 281993 , 401 , 1126897 , 282001 , 1126889 , 1959361 , 1681873 , 563593 , 433 , 841097 , 1959401 , 1408417 , 1959313 , 1681817 , 457 , 841193 , 449 , 563657 , 282089 , 282097 , 1408409 , 1408601 , 1959521 , 1682017 , 841241 , 1408577 , 569 , 1408633 , 521 , 841273 , 1127033 , 841289 , 617 , 1408529 , 1959457 , 563777 , 841297 , 1959473 , 577 , 593 , 563809 , 601 ,...
|
答案看起来是可靠的,但是我们来看看这个程序的性能。如果我们到Spark UI里面看的话可以发现Spark在整个计算过程中使用了3个stages,下图就是UI中这个计算过程的DAG(Directed Acyclic Graph)可视化图,其中展示了DAG图中不同的RDD计算。
在Spark中,只要job需要在分区之间进行数据交互,那么一个新的stage将会产生(如果使用Spark术语的话,分区之间的数据交互其实就是shuffle)。Spark stage中每个分区将会起一个task进行计算,而这些task负责将这个RDD分区的数据转化(transform)成另外一个RDD分区的数据。我们简单地看下Stage 0的task运行情况:
上图中我们对Duration
和Shuffle Write Size / Records
两列非常感兴趣。sc.parallelize(2 to n, 8)
已经生成了1999999 records,而这写记录均匀地分布到8个分区里面;每个task的计算几乎花费了相同的时间,所以这个stage是没问题的。
Stage 1是比较重要的stage,因为它运行了map
和flatMap
transformation,我们来看看它的运行情况:
从上图可以看出,这个stage运行的并不好,因为工作负载并没有均衡到所有的task中!93%的数据集中在一个task中,而这个task的计算花费了14s;另外一个比较慢的task花费了1s。然而我们提供了8个core用于计算,而其中的7个core在这13s内都在等待这个stage的完成。这对资源的利用非常不高效。
如果想及时了解Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号:iteblog_hadoop
为什么会出现这种情况?
当我们运行sc.parallelize(2 to n, 8)
语句的时候,Spark使用分区机制将数据很好地分成8个组。它最有可能使用的是range partitioner,也就是说2-250000被分到第一个分区; 250001-500000分到第二个分区等等。然而我们的map函数将这些数转成(key,value)pairs,而value里面的数据大小变化很大(key比较小的时候,value的值就比较多,从而也比较大)。每个value都是一个list,里面存放着我们需要乘上key并小于2000000的倍数值,有一半以上的键值对(所有key大于1000000)的value是空的;而key等于2对应的value是最多的,包含了所有从2到1000000的数据!这就是为什么第一个分区拥有几乎所有的数据,它的计算花费了最多的时间;而最后四个分区几乎没有数据!
如何解决
我们可以将数据重新分区。通过对RDD调用.repartition(numPartitions)
函数将会使Spark触发shuffle并且将数据分布到我们指定的分区数中,所以让我们尝试将这个加入到我们的代码中。
我们除了在.map
和.flatMap
函数之间加上.repartition(8)
之外,其他的代码并不改变。我们的RDD现在同样拥有8个分区,但是现在的数据将会在这些分区重新分布,修改后的代码如下:
/**
* User: 过往记忆
* Date: 2016年6月24日
* Time: 下午21:16
* bolg: http://www.iteblog.com
* 本文地址:http://www.iteblog.com/archives/1695
* 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
* 过往记忆博客微信公共帐号:iteblog_hadoop
*/
val composite = sc.parallelize( 2 to n, 8 ).map(x = > (x, ( 2 to (n / x)))).repartition( 8 ).flatMap(kv = > kv. _ 2 .map( _ * kv. _ 1 ))
|
新的DAG可视化图看起来比之前更加复杂,因为repartition操作会有shuffle操作,所有增加了一个stage。
Stage 0和之前一样,新的 Stage 1看起来和 Stage 0也很类似,每个task大约都处理250000条记录,并且花费1s的时间。 Stage 2是比较重要的stage,下面是其截图:
从上图可以看出,现在的Stage 2比之前旧的Stage 1性能要好很多,这次Stage我们处理的数据和之前旧的Stage 1同样多,但是这次每个task花费的时候大概为5s,而且每个core得到了高效地使用。
两个版本的代码最后一个Stage大概都运行了6s,所以第一个版本的代码运行了大约0.5 + 14 + 6 = ~21s
;而对数据进行重新分布之后,这次运行的时间大约为0.5 + 1 + 5 + 6 = ~13s
。虽然说修改后的代码需要做一些额外的计算(重新分布数据),但是这个修改却减少了总的运行时间,因为它使得我们可以更加高效地使用我们的资源。
当然,如果你的目标是寻找质数,有比这里介绍的更加高效的算法。但是本文仅仅是用来介绍考虑Spark数据的分布是多么地重要。增加.repartition
函数将会增加Spark总体的工作,但好处可以显著大于成本
本文翻译自:Improving Spark Performance With Partitioning
本博客文章除特别声明,全部都是原创!
尊重原创,转载请注明: 转载自过往记忆(http://www.iteblog.com/)
本文链接: 【通过分区(Partitioning)提高Spark的运行性能】(http://www.iteblog.com/archives/1695)
通过分区(Partitioning)提高Spark的运行性能相关推荐
- 计算质数通过分区(Partition)提高Spark的运行性能(转载+自己理解)
这篇博客是对[1]的进一步详细描述 自己的配置是台式机一台+笔记本组成spark集群 #-------------------------------------------------------- ...
- Spark SQL运行流程及性能优化:RBO和CBO
1 Spark SQL运行流程 1.1 Spark SQL核心--Catalyst Spark SQL的核心是Catalyst查询编译器,它将用户程序中的SQL/Dataset/DataFrame经过 ...
- oracle分区表执行计划分区合并,利用ORACLE分区技术提高管理和性能_PART2
接PART1:http://blog.chinaunix.net/uid/7655508.html 11g interval分区: 1)11g之前创建日期范围分区,经常是预先创建一部分,等即将用完重新 ...
- Webpack5优化之提高代码运行性能(Preload、Network Cache、Core-js、PWA)
文章目录 一.Preload/Prefetch 1.1 为什么 1.2 是什么 1.2.1 共同点: 1.2.2 区别: 1.2.3 问题 1.2.4 总结 1.3 怎么样 1.3.1 安装依赖 1. ...
- spark算子_Spark 性能优化(四)——程序开发调优
1.4 程序开发调优 Spark 性能优化的第一步,就是要在开发 Spark 作业的过程中注意和应用一些性能优化的基本原则.开发调优,就是要让大家了解以下一些 Spark 基本开发原则,包括:RDD ...
- Project Tungsten:让Spark将硬件性能压榨到极限
Project Tungsten:让Spark将硬件性能压榨到极限 摘要:对于Spark来说,通用只是其目标之一,更好的性能同样是其赖以生存的立足之本.北京时间4月28日晚,Databricks ...
- 建立合理的索引提高SQL Server的性能
建立合理的索引提高SQL Server的性能- 标签:索引,性能优化 建立合理的索引提高SQL Server的性能 在应用系统中,尤其在联机事务处理系统中,对数据查询及处理速度已成为衡量应用系统成败的 ...
- spark 应用程序性能优化经验
一 常规性能调优 1 . 分配更多资源 --num-executors 3 \ 配置executor的数量 --driver-memory 100m \ 配置driver的内存(影响不大) --e ...
- Spark:运行原理 图解
Spark应用程序以进程集合为单位在分布式集群上运行,通过driver程序的main方法创建的SparkContext对象与集群交互. 1.Spark通过SparkContext向Cluster ma ...
最新文章
- 【相机标定】四个坐标系之间的变换关系
- spring (由Rod Johnson创建的一个开源框架)
- WPF中的动画——(二)From/To/By 动画
- js发送邮件确定email地址
- ACS AD 和本地验证SSL ×××
- CTFshow 命令执行 web53
- 隐马尔可夫模型中的Viterbi算法zz
- Apple 预计于内华达州雷诺市再盖一个数据中心
- 前端学习(360):svn操作前期连接工作
- 炎热天气看书还是钓鱼?隐马尔科夫模型教你预测!
- 对Moss 2007中访问群体的设置和使用补充
- vs生成命令和属性的宏
- python安装教程-PyCharm 安装教程(Windows)
- 【定位问题】基于matlab三维chan算法求解室内定位问题【含Matlab源码 580期】
- python中函数包括_python中有哪些函数
- 质量管理 六西格玛-黑带大师
- 交叉编译工具链的安装以及介绍
- cmd查看自己的CPU参数
- 网页回到顶部 GoTop 按钮自动隐藏
- 计算机基础知识第一章测试题,计算机基础知识测试题第一章
热门文章
- fscokopen 中执行超时 使用stream_set_timeout设置超时
- [RMAN]数据库全部介质恢复
- linux之dos2unix命令
- Git教程——入门基础
- Java Programming Test Question 2
- Linux BASH多进程并行处理的方法实现
- 发布NGuestBook(一个基于.NET平台的分层架构留言本小系统)
- 设计Twitter时间轴并进行搜索
- python连接传感器_树莓派4B之光敏传感器模块(python3)
- mac合上盖子不锁屏_macbook合上盖子不断网的设置