流程分析

入口处:

org.apache.spark.scheduler.ShuffleMapTask.runTask
override def runTask(context: TaskContext): MapStatus = {// Deserialize the RDD using the broadcast variable.
  val threadMXBean = ManagementFactory.getThreadMXBean
  val deserializeStartTime = System.currentTimeMillis()val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0L
  val ser = SparkEnv.get.closureSerializer.newInstance()val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime} else 0L

  var writer: ShuffleWriter[Any, Any] = null
  try {val manager = SparkEnv.get.shuffleManagerwriter = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])writer.stop(success = true).get} catch {case e: Exception =>try {if (writer != null) {writer.stop(success = false)}} catch {case e: Exception =>log.debug("Could not stop writer", e)}throw e}
}

这里manager 拿到的是

先看private[spark] trait ShuffleManager  是一个接口,

SortShuffleManager实现了该接口。

private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager with Logging

   org.apache.spark.shuffle.sort.SortShuffleWriter

我们看他是如何拿到可以写磁盘的那个sorter的。

override def getWriter[K, V](handle: ShuffleHandle,mapId: Int,context: TaskContext): ShuffleWriter[K, V] = {numMapsForShuffle.putIfAbsent(handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)val env = SparkEnv.get
  handle match {case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>new UnsafeShuffleWriter(env.blockManager,shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],context.taskMemoryManager(),unsafeShuffleHandle,mapId,context,env.conf)case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>new BypassMergeSortShuffleWriter(env.blockManager,shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],bypassMergeSortHandle,mapId,context,env.conf)case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)}
}
这里case了2种情况:
/**
 * Subclass of [[BaseShuffleHandle]], used to identify when we've chosen to use the
 * serialized shuffle.   是否序列化
 */
private[spark] class SerializedShuffleHandle[K, V](shuffleId: Int,numMaps: Int,dependency: ShuffleDependency[K, V, V])extends BaseShuffleHandle(shuffleId, numMaps, dependency) {
}/**
 * Subclass of [[BaseShuffleHandle]], used to identify when we've chosen to use the
 * bypass merge sort shuffle path.      绕过归并排序的shuffle路径。
 */
private[spark] class BypassMergeSortShuffleHandle[K, V](shuffleId: Int,numMaps: Int,dependency: ShuffleDependency[K, V, V])extends BaseShuffleHandle(shuffleId, numMaps, dependency) {
}
这里再看看BaseShuffleHandle
/**
 * A basic ShuffleHandle implementation that just captures registerShuffle's parameters.
 */
private[spark] class BaseShuffleHandle[K, V, C](shuffleId: Int,val numMaps: Int,val dependency: ShuffleDependency[K, V, C])extends ShuffleHandle(shuffleId)
继续看
abstract class ShuffleHandle(val shuffleId: Int) extends Serializable {}
是一个抽象类,实现了序列化。

继续
new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
类的定义:
private[spark] class SortShuffleWriter[K, V, C](shuffleBlockResolver: IndexShuffleBlockResolver,handle: BaseShuffleHandle[K, V, C],mapId: Int,context: TaskContext)extends ShuffleWriter[K, V] with Logging
然后write操作
/** Write a bunch of records to this task's output */   一串 bunch
override def write(records: Iterator[Product2[K, V]]): Unit = {sorter = if (dep.mapSideCombine) {require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")new ExternalSorter[K, V, C](context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)} else {// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
    // care whether the keys get sorted in each partition; that will be done on the reduce side
    // if the operation being run is sortByKey.
    new ExternalSorter[K, V, V](context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)}sorter.insertAll(records)// Don't bother including the time to open the merged output file in the shuffle write time,
  // because it just opens a single file, so is typically too fast to measure accurately
  // (see SPARK-3570).
  val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)val tmp = Utils.tempFileWith(output)try {val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)val partitionLengths = sorter.writePartitionedFile(blockId, tmp)shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)} finally {if (tmp.exists() && !tmp.delete()) {logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")}}
}

我们分析的线路假设需要做mapSideCombine

 sorter = if (dep.mapSideCombine) {  require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")  new ExternalSorter[K, V, C](dep.aggregator, Some(dep.partitioner), dep.keyOrdering, de.serializer)

接着将map的输出放到sorter当中:

sorter.insertAll(records)  //
备注一下sorter位置
//private var sorter: ExternalSorter[K, V, _] = null
def insertAll(records: Iterator[Product2[K, V]]): Unit = {// TODO: stop combining if we find that the reduction factor isn't high
  val shouldCombine = aggregator.isDefinedif (shouldCombine) {// Combine values in-memory first using our AppendOnlyMap
    val mergeValue = aggregator.get.mergeValueval createCombiner = aggregator.get.createCombinervar kv: Product2[K, V] = null
    val update = (hadValue: Boolean, oldValue: C) => {if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)}while (records.hasNext) {addElementsRead()kv = records.next()map.changeValue((getPartition(kv._1), kv._1), update)maybeSpillCollection(usingMap = true)}} else {// Stick values into our buffer
    while (records.hasNext) {addElementsRead()val kv = records.next()buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])maybeSpillCollection(usingMap = false)}}
}
//import org.apache.spark.util.collection.ExternalSorter

其中insertAll 的流程是这样的:

 while (records.hasNext) {  addElementsRead()  kv = records.next() map.changeValue((getPartition(kv._1), kv._1), update)maybeSpillCollection(usingMap = true)}
private val fileBufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024

// Size of object batches when reading/writing from serializers.
//
// Objects are written in batches, with each batch using its own serialization stream. This
// cuts down on the size of reference-tracking maps constructed when deserializing a stream.
//
// NOTE: Setting this too low can cause excessive copying when serializing, since some serializers
// grow internal data structures by growing + copying every time the number of objects doubles.
private val serializerBatchSize = conf.getLong("spark.shuffle.spill.batchSize", 10000)// Data structures to store in-memory objects before we spill. Depending on whether we have an
// Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we
// store them in an array buffer.
@volatile private var map = new PartitionedAppendOnlyMap[K, C]
@volatile private var buffer = new PartitionedPairBuffer[K, C]

里面的map 其实就是PartitionedAppendOnlyMap,这个是全内存的一个结构。当把这个写满了,才会触发spill操作。你可以看到maybeSpillCollection在PartitionedAppendOnlyMap每次更新后都会被调用。

一旦发生呢个spill后,产生的文件名称是:

    "temp_shuffle_" + id

逻辑在这:

val (blockId, file) = diskBlockManager.createTempShuffleBlock() def createTempShuffleBlock(): (TempShuffleBlockId, File) = {  var blockId = new TempShuffleBlockId(UUID.randomUUID()) while (getFile(blockId).exists()) {   blockId = new TempShuffleBlockId(UUID.randomUUID())  }  (blockId, getFile(blockId))}

产生的所有 spill文件被被记录在一个数组里:

  private val spills = new ArrayBuffer[SpilledFile]

迭代完一个task对应的partition数据后,会做merge操作,把磁盘上的spill文件和内存的,迭代处理,得到一个新的iterator,这个iterator的元素会是这个样子的:

 (p, mergeWithAggregation(  iterators, aggregator.get.mergeCombiners, keyComparator,ordering.isDefined))

其中p 是reduce 对应的partitionId, p对应的所有数据都会在其对应的iterator中。

接着会获得最后的输出文件名:

val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)

文件名格式会是这样的:

 "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data"

其中reduceId 是一个固定值NOOP_REDUCE_ID,默认为0。

然后开始真实写入文件

   val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)

写入文件的过程过程是这样的:

for ((id, elements) <- this.partitionedIterator) { if (elements.hasNext) {   val writer = blockManager.getDiskWriter(blockId,outputFile, serInstance,fileBufferSize,  context.taskMetrics.shuffleWriteMetrics.get)   for (elem <- elements) {     writer.write(elem._1, elem._2)   }   writer.commitAndClose()
val segment = writer.fileSegment()
lengths(id) = segment.length  }
}

刚刚我们说了,这个 this.partitionedIterator 其实内部元素是reduce partitionID -> 实际record 的 iterator,所以它其实是顺序写每个分区的记录,写完形成一个fileSegment,并且记录偏移量。这样后续每个的reduce就可以根据偏移量拿到自己需要的数据。对应的文件名,前面也提到了,是:

"shuffle_" + shuffleId + "_" + mapId + "_" + NOOP_REDUCE_ID + ".data"

刚刚我们说偏移量,其实是存在内存里的,所以接着要持久化,通过下面的writeIndexFile来完成:

 shuffleBlockResolver.writeIndexFile(dep.shuffleId,mapId, partitionLengths)

具体的文件名是:

  "shuffle_" + shuffleId + "_" + mapId + "_" + NOOP_REDUCE_ID + ".index"

至此,一个task的写入操作完成,对应一个文件。

最终结论

所以最后的结论是,一个Executor 最终对应的文件数应该是:

MapNum (注:不包含index文件)

同时持有并且会进行写入的文件数最多为::

 CoreNum

Spark Shuffle Write阶段磁盘文件分析相关推荐

  1. SPARK SHUFFLE中 ShuffleId BlockManagerId 以及 与ESS(External Shuffle Server)交互

    背景 本文基于SPARK 3.2.1 用来更好的理解spark shuffle中的点点滴滴 分析 我们直接从SortShuffleManager着手,因为这是个shuffle的纽带: override ...

  2. Spark Shuffle源码分析系列之UnsafeShuffleWriter

    前面我们介绍了BypassMergeSortShuffleWriter和SortShuffleWriter,知道了它们的应用场景和实现方式,本节我们来看下UnsafeShuffleWriter,它使用 ...

  3. spark基础之shuffle机制和原理分析

    一 概述 Shuffle就是对数据进行重组,由于分布式计算的特性和要求,在实现细节上更加繁琐和复杂 在MapReduce框架,Shuffle是连接Map和Reduce之间的桥梁,Map阶段通过shuf ...

  4. Spark Shuffle源码分析系列之PartitionedPairBufferPartitionedAppendOnlyMap

    概述 SortShuffleWriter使用ExternalSorter进行ShuffleMapTask数据内存以及落盘操作,ExternalSorter中使用内存进行数据的缓存过程中根据是否需要ma ...

  5. shuffle机制和原理分析

    Shuffle简介 Shuffle描述着数据从map task输出到reduce task输入的这段过程.shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须经过s ...

  6. Spark Shuffle 解析

    5.Spark Shuffle 解析 5.1 Shuffle 的核心要点 5.1.1 ShuffleMapStage 与 FinalStage 在划分 stage 时,最后一个 stage 称为 Fi ...

  7. 阿里云Spark Shuffle的优化

    转自:大数据技术与架构 本次分享者:辰石,来自阿里巴巴计算平台事业部EMR团队技术专家,目前从事大数据存储以及Spark相关方面的工作. Spark Shuffle介绍 Smart Shuffle设计 ...

  8. Spark Shuffle系列-----1. Spark Shuffle与任务调度之间的关系

    本文转自http://blog.csdn.net/u012684933/article/details/49074185,所有权力归原作者所有,仅供学习. Spark根据RDD间的依赖关系是否是Shu ...

  9. Spark(Shuffle)

    2019独角兽企业重金招聘Python工程师标准>>> Shuffle Shuffle是Spark对各分区的数据进行重新分布的机制,是一个复杂而且代价较高的操作, 因为一般需要在执行 ...

最新文章

  1. 千千静听4.6.7版发布了
  2. 高并发服务优化篇:从RPC预热转发看服务端性能调优
  3. qt用ODBC连接excel
  4. Linux压缩解压缩文章总结
  5. r和matlab学哪个,初学者求教‘r*’是什么意思啊
  6. 苹果推出雷雳 3 Pro连接线:黑色编织设计 售价949元
  7. 互联网行业哪个职位比较有前途?
  8. redis mysql 事务_Mysql与Redis事务
  9. Bailian2804 词典【map+字典树】
  10. matlab size
  11. [转载] C++转JAVA的转换方法及约定
  12. 与spring的整合
  13. win10 uwp 应用转后台清理内存
  14. c语言一维数组求平均成绩,C语言 计算一维数组平均值(函数).doc
  15. JAVA高级基础(26)---File的常用方法
  16. eclipse的常用操作
  17. java段落对齐_Java 设置Word段落缩进、对齐方式
  18. python3.6没有pip_python3.6 安装后没有pip?
  19. linux 查看numa信息,Linux中查看NUMA信息
  20. android 双屏apk,双屏可折叠 通吃.exe和.apk 微软终于发大招了!

热门文章

  1. 你懂change buffer吗
  2. 数据结构题:克鲁斯卡尔(Kruscal)算法求最小生成树
  3. C语言浮点数据在内存中的存储方式
  4. java调用c dll,指针参数和结构体参数搞定
  5. linux锐捷认证成功无法上网,win7系统锐捷认证成功但是却无法上网的解决方法
  6. centos下搭建网站服务器,Centos7搭建web服务器
  7. java web随机抽取_java实现随机抽取奖品工具类
  8. pytorch指定用多张显卡训练_Pytorch中多GPU训练指北
  9. qt百度地图html,Qt的QWebChannel和JS、HTML通信/交互驱动百度地图
  10. 51单片机计算机实物焊接,基于51单片机的最小系统焊接图 浅谈单片机最小系统...