今天有同事问起Spark中spark.default.parallelism参数的意义,以及该如何设置。故在这里留个记录,算是做个小结。

Spark并行度设置的相关因素

Spark并行度的设置在Spark任务中是常常会谈及的问题,它是由partition的数量决定的。而partition的数量是由不同的因素决定的,它和资源的总cores、spark.default.parallelism参数、读取数据源的类型等有关系,在不同情况下,某个因素就起着主要的作用。下面看下Spark读取HDFS文本的并行度设置。

Spark读取HDFS文本的并行度设置

spark读取HDFS文本确定parition的方式,和前辈mapreduce的方式核心原理是一致的,只是在获取defaultMinPartitions的时候有所不同。

  def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {assertNotStopped()hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],minPartitions).map(pair => pair._2.toString).setName(path)}

如果spark.default.parallslism有设置,defaultPartitions就会取设置的这个值。如果没有设置,则会根据分配给任务的总的cores数量和2比较后取最大值:

  override def defaultParallelism(): Int = {conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))}

defaultMinPartitions会再取个最小值:

def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

hadoopFile()中实例化了HadoopRDD,计算parition数量会调用getPartitions(),然后熟悉的过程就发生了。

  override def getPartitions: Array[Partition] = {val jobConf = getJobConf()// add the credentials here as this can be called before SparkContext initializedSparkHadoopUtil.get.addCredentials(jobConf)val inputFormat = getInputFormat(jobConf)val inputSplits = inputFormat.getSplits(jobConf, minPartitions)val array = new Array[Partition](inputSplits.size)for (i <- 0 until inputSplits.size) {array(i) = new HadoopPartition(id, i, inputSplits(i))}array}

HDFS文本属于FileInputFormat类型,所以这里就会动态调用FileInputFormat类的getSplits()方法,在这里算得了partition的数量。

先得到goalSize和minSize,供后面比较使用:

    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

文本是可切割的,那么喜蛋终于来了:

if (isSplitable(fs, path)) {long blockSize = file.getBlockSize();long splitSize = computeSplitSize(goalSize, minSize, blockSize);long bytesRemaining = length;while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,length-bytesRemaining, splitSize, clusterMap);splits.add(makeSplit(path, length-bytesRemaining, splitSize,splitHosts[0], splitHosts[1]));bytesRemaining -= splitSize;}if (bytesRemaining != 0) {String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length- bytesRemaining, bytesRemaining, clusterMap);splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,splitHosts[0], splitHosts[1]));}
} 

来看computeSplitSize()方法,这里确定了一个partition的大小(blockSize是HDFS文件块的大小,默认128M):

  protected long computeSplitSize(long goalSize, long minSize,long blockSize) {return Math.max(minSize, Math.min(goalSize, blockSize));}

默认minSize很小,设为1,这里也假设是默认值不变。

如果goalSize > blockSize,则splitSize值取blockSize,肯定会有2个以上的partition。

如果goalSize < blockSize,则splitSize值取goalSize,会产生1-2个partition。

Spark读取HDFS文本的parition数量得到以后,并行度也就确定了。

如何设置Spark并行度才是合理的?

Spark并行度对于提高Spark任务的运行效率是非常关键的。合理设置Spark并行度可以从几个方面考虑:

1、充分利用任务资源(并行度略高于分配给任务的cpu资源数Executors * 每个Executor使用的cores)

2、平均每个parition的大小不要太大不要过小,一般在百兆较合适

3、根据实际机器的分配给任务的资源和任务需要计算的数据量大小,根据上面两点进行权衡设置。

Spark并行度的设定相关推荐

  1. 谈谈spark.sql.shuffle.partitions和 spark.default.parallelism 的区别及spark并行度的理解

    谈谈spark.sql.shuffle.partitions和 spark.default.parallelism 的区别及spark并行度的理解 spark.sql.shuffle.partitio ...

  2. spark并行度(parallelism)和分区(partition)未生效的问题

    spark的并行度对spark的性能是又很大的影响的,spark任务能快速计算主要就是因为内存计算和并行计算. 对于并行计算,我们就要涉及到并行度的问题,那并行度跟什么有关系呢? 源数据 hdfs文件 ...

  3. 从spark.default.parallelism参数来看Spark并行度、并行计算任务概念

    1 并行度概念理解 并行度:并行度= partition= task总数.但是同一时刻能处理的task数量由并行计算任务决定(CPU cores决定). 并行度(Parallelism)指的是分布式数 ...

  4. Spark 并行度和分区的关系

    合理设置并行度可以充分利用集群的资源,加快运行速度.官方给的建议是设置你并行度是你集群所有core的2到3倍.比如说你的集群有50个Executor,每一个executor有3个core,那就是总共1 ...

  5. Spark并行度和任务调度

    文章目录 并行度 如何设置并行度 如何规划我们自己群集环境的并行度? Spark的任务调度 并行度 Spark之间的并行就是在同一时间内,有多少个Task在同时运行.并行度也就是并行能力的设置,假设并 ...

  6. [Spark基础]-- spark并行度和partion联系

    一.问题 1.怎样提高并行度? 几种方式:(1)reduce时,输入参数(int)   (2)partitionBy()输入分区数  (3)SparkContext.textFile(path,num ...

  7. Spark RDD并行度与分区设置

    默认情况下,Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能 够并行计算的任务数量我们称之为并行度.这个数量可以在构建 RDD 时指定.记住,这里 的并行执行的任 ...

  8. PySpark | RDD持久化 | 共享变量 | Spark内核调度

    文章目录 一.RDD持久化 1.RDD的数据是过程数据 2.RDD缓存 2.1 RDD缓存的特点 2.2 cache()与unpersist()实战 3.RDD CheckPoint 3.1 Chec ...

  9. Spark面试精选题(03)

    1.Spark master使用zookeeper进行HA的,有哪些元数据保存在Zookeeper? 答:spark通过这个参数spark.deploy.zookeeper.dir指定master元数 ...

最新文章

  1. 开发日记-20190424 关键词 阶段性开发心得和小结
  2. Halcon算子:min_max_gray和gray_histo的区别
  3. Problem G. Pandaria(线段树合并 + Kruskal 重构树)
  4. jvm需要多长时间进行转义分析? 可能比您想象的要长。
  5. Mapreduce的工作流程
  6. c语言指针慕课,C语言-指针
  7. Docker-服务安装
  8. 16_使用开源项目下载文件
  9. day-60Django
  10. “电”亮数字生活,阿里云助力南方电网智能调度
  11. vos3000_v7.x版本的快速安装方法
  12. 【通信】基于量子密钥分发密钥率仿真含Matlab源码
  13. Wpf初学 ---03设计一个优美的注册登录界面(连接数据库)
  14. NX入门到提高全部视频教程
  15. 大写汉字转阿拉伯数字c语言,把中文汉字大写数字 转换成 阿拉伯数字
  16. word文档,中文输入模式下打出英文标点原因(微软拼音输入法为例)
  17. 一文掌握汽车总线CAN帧报文
  18. Linux: SSH免密登录配置完了不生效
  19. 基于openmv的小车
  20. 后台启动zookeeper

热门文章

  1. Windows环境下安装pkg-config
  2. 当YOLOv5遇见OpenVINO!
  3. Early Convolutions Help Transformers See Better
  4. 【Unity Shader】(2)半兰伯特模型 构建光照
  5. 云之讯手机号短信验证
  6. 【mp3】洗脑循环了!龙珠超 自在极意功 【究极の圣戦】串田アキラ 背景纯音乐...
  7. 机制分析:基于简易的时间片轮转多道程序的 linux 内核
  8. Opencv存图读图
  9. 【Linux 内核 内存管理】物理分配页 ⑧ ( __alloc_pages_slowpath 慢速路径调用函数源码分析 | 获取首选内存区域 | 异步回收内存页 | 最低水线也分配 | 直接分配 )
  10. react源码中的fiber架构