ShuffleWriter

  • 1.概述
  • 2.ShuffleHandle注册
    • 2.1.注册时间点
    • 2.2.向shuffleManager注册shuffle
      • 2.2.1.BypassMergeSortShuffleHandle判断
      • 2.2.2.SerializedShuffleHandle判断
  • 3.ShuffleWriter实例化
    • 3.1.实例化时间点
      • 3.1.1.Executor#run-执行器运行任务
      • 3.1.2.实例化ShuffleWriter对象
  • 4.BypassMergeSortShuffleWriter
    • 4.1.实例化
    • 4.2.write
      • 4.2.1.writePartitionedFile-合并临时文件
  • 5.SortShuffleWriter
    • 5.1.实例化
    • 5.2.write
      • 5.2.1.ExternalSorter#insertAll-数据添加到ExternalSorter
        • 5.2.1.1.ExternalSorter#maybeSpillCollection-判断是否需要将缓存数据溢写到磁盘
        • 5.2.1.2.ExternalSorter#spill-缓存数据溢写到磁盘
      • 5.2.2.ExternalSorter#writePartitionedFile-产生完整数据文件
        • 5.2.2.1.ExternalSorter#partitionedIterator-磁盘数据和内存数据合并
  • 6.UnsafeShuffleWriter
    • 6.1.实例化
    • 6.2.write
      • 6.2.1.insertRecordIntoSorter-添加数据到排序器
        • 6.2.1.1.ShuffleExternalSorter#insertRecord
          • 6.2.1.1.1.ShuffleExternalSorter#spill-缓存数据溢写磁盘
            • 6.2.1.1.1.1.writeSortedFile-数据溢写到临时文件
            • 6.2.1.1.1.2.溢写数据排序
          • 6.2.1.1.2.growPointerArrayIfNecessary-内存排序器缓存扩容
            • 6.2.1.1.2.1.扩容条件判断
            • 6.2.1.1.2.2.内存使用量计算
            • 6.2.1.1.2.3.allocateArray-扩容
            • 6.2.1.1.2.4.expandPointerArray-数据转移
          • 6.2.1.1.3.acquireNewPageIfNecessary-申请新的page存储数据
      • 6.2.2.closeAndWriteOutput-外部排序器缓存数据落地到磁盘
        • 6.2.2.1.closeAndGetSpills-强制flush缓存到磁盘中
        • 6.2.2.2.mergeSpills-合并临时文件
    • 6.3.排序器
      • 6.3.1.ShuffleExternalSorter-外部排序器
        • 6.3.1.1.ShuffleInMemorySorter-内存排序器
        • 6.3.1.1.1.可用容量计算
    • 6.4. 总结
  • 7.创建数据文件对应index文件
  • 8.总结
  • 9.参考资料

1.概述

本次分析基于spark版本2.11进行;

spark中的shuffle是一个整体的大框架,本次主要对ShuffleWriter在shuffle中产生作用的原理进行梳理;

2.ShuffleHandle注册

2.1.注册时间点

  • shuffleHandle是宽依赖ShuffleDependency的属性之一;
  • 当实例化宽依赖对象的时候,就会向shuffleManager注册handle,并返回handle用以初始化shuffleHandle属性;
  • 向shuffleManager注册handle时,会实例化一个ShuffleHandle对象;
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](@transient private val _rdd: RDD[_ <: Product2[K, V]],val partitioner: Partitioner,val serializer: Serializer = SparkEnv.get.serializer,val keyOrdering: Option[Ordering[K]] = None,val aggregator: Option[Aggregator[K, V, C]] = None,val mapSideCombine: Boolean = false)extends Dependency[Product2[K, V]] {override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]val shuffleId: Int = _rdd.context.newShuffleId()//向shuffleManager注册handle,并返回handle初始化shuffleHandle属性val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(shuffleId, _rdd.partitions.length, this)
}

2.2.向shuffleManager注册shuffle

说明:

  • 在spark中,SortShuffleManager是ShuffleManager的唯一实现类,注册shuffle最终在SortShuffleManager中完成;
  • 发生时间:
    • 在实例化ShuffleDependency对象时,初始化宽依赖的shuffleHandle属性,此时宽依赖向shuffleManager注册shuffle;

总结:

  • BypassMergeSortShuffleHandle:

    • 适用:不是map端聚合且分区数不高于200
    • 效果:直接写入numPartitions文件,并在最后将它们连接起来
    • 优势:避免了进行两次序列化和反序列化以合并溢出的文件
    • 缺点:一次打开多个文件,从而为缓冲区分配更多内存;
  • BaseShuffleHandle:
    • 适用:前面2中不适用的;
    • 效果:以反序列化的形式缓冲映射输出
    • 特点:支持map端聚合

SerializedShuffleHandle:

  • 适用

    • 序列化器支持对象迁移:持序列化重定向;
    • 非map端聚合
    • 分区数不大于16777216
  • 效果:以序列化的形式缓冲映射输出
private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {override def registerShuffle[K, V, C](shuffleId: Int,numMaps: Int,dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {//不是map端聚合且分区数不高于200if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {// 直接写入numPartitions文件,并在最后将它们连接起来//这避免了进行两次序列化和反序列化以合并溢出的文件,这在正常的代码路径中会发生。缺点是一次打开多个文件,从而为缓冲区分配更多内存。new BypassMergeSortShuffleHandle[K, V](shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])} //序列化器支持对象迁移、非map端聚合、分区数不大于16777216else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {// 以序列化的形式缓冲映射输出new SerializedShuffleHandle[K, V](shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])} else {// 以反序列化的形式缓冲映射输出new BaseShuffleHandle(shuffleId, numMaps, dependency)}}
}

2.2.1.BypassMergeSortShuffleHandle判断

要求:

  • 不是map端聚合且分区数不高于200
  • 分区数阈值由spark.shuffle.sort.bypassMergeThreshold参数指定;默认值200;
private[spark] object SortShuffleWriter {//不是map端聚合且分区数不高于200def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {// map端聚合需要排序if (dep.mapSideCombine) {false} else {//spark.shuffle.sort.bypassMergeThreshold : 默认值200val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)//分区数小于spark.shuffle.sort.bypassMergeThreshold或200dep.partitioner.numPartitions <= bypassMergeThreshold}}
}

2.2.2.SerializedShuffleHandle判断

要求:

  • 支持序列化重定向;
  • 非map端聚合;
  • 分区数不大于16777216;
  • 以上3个条件同时满足;
private[spark] object SortShuffleManager extends Logging {//16777216val MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE =PackedRecordPointer.MAXIMUM_PARTITION_ID + 1//序列化器支持对象迁移、非map端聚合、分区数不大于16777216def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {val shufId = dependency.shuffleIdval numPartitions = dependency.partitioner.numPartitions//序列化器不支持对象迁移if (!dependency.serializer.supportsRelocationOfSerializedObjects) {log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " +s"${dependency.serializer.getClass.getName}, does not support object relocation")false} //map端聚合else if (dependency.mapSideCombine) {log.debug(s"Can't use serialized shuffle for shuffle $shufId because we need to do " +s"map-side aggregation")false} //分区数大于16777216else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " +s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions")false} else {log.debug(s"Can use serialized shuffle for shuffle $shufId")true}}
}

3.ShuffleWriter实例化

3.1.实例化时间点

实例化和使用时机:

  • 当executor执行shuffle任务时,底层调用ShuffleMapTask.runTask()函数进行实现;
  • ShuffleMapTask.runTask()函数中,会实例化一个ShuffleWriter对象,然后通过ShuffleWriter.write()函数将数据落地到磁盘;

3.1.1.Executor#run-执行器运行任务

说明:

  • 在Executor中,执行shuffle任务时,底层调用ShuffleMapTask.runTask()函数进行实现;
private[spark] class Executor(executorId: String,executorHostname: String,env: SparkEnv,userClassPath: Seq[URL] = Nil,isLocal: Boolean = false,uncaughtExceptionHandler: UncaughtExceptionHandler = new SparkUncaughtExceptionHandler)extends Logging {//针对shuffle任务,实例化ShuffleMapTask对象@volatile var task: Task[Any] = _override def run(): Unit = {//---------其他代码---------try {//---------其他代码---------val value = Utils.tryWithSafeFinally {//针对shuffle任务,执行ShuffleMapTask#runTask函数val res = task.run(taskAttemptId = taskId,attemptNumber = taskDescription.attemptNumber,metricsSystem = env.metricsSystem)threwException = falseres}//---------其他代码---------} catch {//---------其他代码---------} finally {runningTasks.remove(taskId)}}}
  • ShuffleMapTask.runTask()函数中,会实例化一个ShuffleWriter对象,然后通过ShuffleWriter.write将数据落地到磁盘;
  • 实例化一个ShuffleWriter对象时候会将stage依赖中维护的shuffleHandle传过去;
private[spark] class ShuffleMapTask(stageId: Int,stageAttemptId: Int,taskBinary: Broadcast[Array[Byte]],partition: Partition,@transient private var locs: Seq[TaskLocation],localProperties: Properties,serializedTaskMetrics: Array[Byte],jobId: Option[Int] = None,appId: Option[String] = None,appAttemptId: Option[String] = None,isBarrier: Boolean = false)extends Task[MapStatus](stageId, stageAttemptId, partition.index, localProperties,serializedTaskMetrics, jobId, appId, appAttemptId, isBarrier)with Logging {override def runTask(context: TaskContext): MapStatus = {//---------其他代码---------val ser = SparkEnv.get.closureSerializer.newInstance()val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)//---------其他代码---------var writer: ShuffleWriter[Any, Any] = nulltry {val manager = SparkEnv.get.shuffleManager//从shuffleManager中获取写入器:将依赖中维护的shuffleHandle传过去writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)//通过写入器将数据落地磁盘writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])//关闭写入器writer.stop(success = true).get} catch {//---------其他代码---------}}
}

3.1.2.实例化ShuffleWriter对象

说明:

  • UnsafeShuffleWriter、BypassMergeSortShuffleWriter、SortShuffleWriter都是ShuffleWriter的子类;
  • SerializedShuffleHandle、BypassMergeSortShuffleHandle、BaseShuffleHandle是ShuffleHandle的子类;
  • 根据ShuffleHandle实例化对象的具体子类类型,实例化不同的ShuffleWriter子类对象;
private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {override def getWriter[K, V](handle: ShuffleHandle,mapId: Int,context: TaskContext): ShuffleWriter[K, V] = {numMapsForShuffle.putIfAbsent(handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)val env = SparkEnv.get//rdd依赖中维护的ShuffleHandle类型,实例化对应的writerhandle match {case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>new UnsafeShuffleWriter(env.blockManager,shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],context.taskMemoryManager(),unsafeShuffleHandle,mapId,context,env.conf)case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>new BypassMergeSortShuffleWriter(env.blockManager,shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],bypassMergeSortHandle,mapId,context,env.conf)case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)}}
}

4.BypassMergeSortShuffleWriter

4.1.实例化

对主要属性进行分析说明

  • 定义数据写入磁盘时文件缓存的大小,默认32kb;

    • 可以过spark.shuffle.file.buffer参数指定
  • 定义合并临时文件时,是否通过NIO方式赋值文件数据:默认true;
  • 以FileSegment数组的形式缓存临时文件句柄;
  • 以long数组的形式缓存每个分区的数据量;
final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {private static final Logger logger = LoggerFactory.getLogger(BypassMergeSortShuffleWriter.class);//数据写入磁盘时的文件缓存,默认32kb,通过spark.shuffle.file.buffer参数指定private final int fileBufferSize;//合并临时文件时,是否通过NIO方式赋值文件数据:默认true;通过spark.file.transferTo参数指定private final boolean transferToEnabled;//分区数private final int numPartitions;private final BlockManager blockManager;//分区器private final Partitioner partitioner;private final ShuffleWriteMetrics writeMetrics;//本次shuffle的唯一标识private final int shuffleId;private final int mapId;private final Serializer serializer;//创建和维护shuffle数据的逻辑块和物理文件位置的对应关系private final IndexShuffleBlockResolver shuffleBlockResolver;//每个分区的写出器private DiskBlockObjectWriter[] partitionWriters;//每个分区的临时文件private FileSegment[] partitionWriterSegments;//文件输出状态信息@Nullable private MapStatus mapStatus;//每个分区的数据量private long[] partitionLengths;private boolean stopping = false;
}

4.2.write

原理:

  • 通过blockManager为每个分区构建一个临时文件;根据临时文件构建分区数据磁盘写出器;
  • 遍历数据记录,数据添加到分区对应临时文件的输出流中;
  • 文件flush,将临时文件输出流中数据真正落地到磁盘文件中;
  • 所有分区的临时文件合并为一个大的数据文件,并且生成对应的index文件;
  • 记录数据输出状态

说明:

  • 调用一次write函数,生成的临时文件根据分区数决定,一个分区一个临时文件;最终合并出来的大文件只有一个,对应生成一个index文件;
  • index文件记录每个分区数据的长度、偏移量;
final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {public void write(Iterator<Product2<K, V>> records) throws IOException {assert this.partitionWriters == null;if (!records.hasNext()) {//空记录,生成一个空index文件this.partitionLengths = new long[this.numPartitions];this.shuffleBlockResolver.writeIndexFileAndCommit(this.shuffleId, this.mapId, this.partitionLengths, (File)null);this.mapStatus = .MODULE$.apply(this.blockManager.shuffleServerId(), this.partitionLengths);} else {//初始化序列化器SerializerInstance serInstance = this.serializer.newInstance();long openStartTime = System.nanoTime();//初始化分区写出器数组this.partitionWriters = new DiskBlockObjectWriter[this.numPartitions];//初始化分区对应临时文件数组this.partitionWriterSegments = new FileSegment[this.numPartitions];//通过blockManager获取block的临时文件,并以此构建分区的磁盘写出器//一个分区对应磁盘写出器和一个临时文件int i;for(i = 0; i < this.numPartitions; ++i) {//通过blockManager构建block的临时文件信息:blockId,临时文件Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile = this.blockManager.diskBlockManager().createTempShuffleBlock();File file = (File)tempShuffleBlockIdPlusFile._2();BlockId blockId = (BlockId)tempShuffleBlockIdPlusFile._1();//构建当前分区的磁盘写出器:绑定临时文件、文件输出流;this.partitionWriters[i] = this.blockManager.getDiskWriter(blockId, file, serInstance, this.fileBufferSize, this.writeMetrics);}this.writeMetrics.incWriteTime(System.nanoTime() - openStartTime);//将数据逐条添加到各分区对应临时文件的输出流中while(records.hasNext()) {Product2<K, V> record = (Product2)records.next();K key = record._1();//将数据添加到临时文件输出流中this.partitionWriters[this.partitioner.getPartition(key)].write(key, record._2());}//将各分区从文件输出流flush到临时文件中for(i = 0; i < this.numPartitions; ++i) {//获取当前分区的磁盘写出器DiskBlockObjectWriter writer = this.partitionWriters[i];//文件flush,返回记录的偏移量、长度、临时文件this.partitionWriterSegments[i] = writer.commitAndGet();writer.close();}//获取输出数据文件File output = this.shuffleBlockResolver.getDataFile(this.shuffleId, this.mapId);//创建输出数据文件的临时文件File tmp = Utils.tempFileWith(output);try {//临时文件合并产生数据文件,返回临时文件长度数组this.partitionLengths = this.writePartitionedFile(tmp);//生成数据文件对应的index文件this.shuffleBlockResolver.writeIndexFileAndCommit(this.shuffleId, this.mapId, this.partitionLengths, tmp);} finally {if (tmp.exists() && !tmp.delete()) {logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());}}//记录数据输出状态this.mapStatus = .MODULE$.apply(this.blockManager.shuffleServerId(), this.partitionLengths);}}
}

4.2.1.writePartitionedFile-合并临时文件

说明:

  • 按照分区顺序,逐个将分区临时文件数据复制到数据文件;
  • 默认根据NIO方式进行数据复制;
    • 可以通过spark.file.transferTo参数指定;
final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {private long[] writePartitionedFile(File outputFile) throws IOException {//初始化分区数据量数组final long[] lengths = new long[numPartitions];if (partitionWriters == null) {//没有分区写出器,代表分区写出器构造没有执行,不存在数据写出,不存在临时文件//返回空数组return lengths;}//构建数据文件输出流final FileOutputStream out = new FileOutputStream(outputFile, true);final long writeStartTime = System.nanoTime();boolean threwException = true;try {//遍历分区for (int i = 0; i < numPartitions; i++) {//取出分区临时文件final File file = partitionWriterSegments[i].file();if (file.exists()) {//构建临时文件输入流final FileInputStream in = new FileInputStream(file);boolean copyThrewException = true;try {//将分区临时文件数据复制到数据文件中//transferToEnabled默认问true:以NIO的方式复制lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);copyThrewException = false;} finally {Closeables.close(in, copyThrewException);}if (!file.delete()) {logger.error("Unable to delete file for partition {}", i);}}}threwException = false;} finally {Closeables.close(out, threwException);writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);}partitionWriters = null;return lengths;}
}

5.SortShuffleWriter

5.1.实例化

SortShuffleWriter实例化相对简单,没有比较多的属性需要初始化;

private[spark] class SortShuffleWriter[K, V, C](shuffleBlockResolver: IndexShuffleBlockResolver,handle: BaseShuffleHandle[K, V, C],mapId: Int,context: TaskContext)extends ShuffleWriter[K, V] with Logging {//依赖关系private val dep = handle.dependencyprivate val blockManager = SparkEnv.get.blockManager//排序器private var sorter: ExternalSorter[K, V, _] = nullprivate var stopping = falseprivate var mapStatus: MapStatus = nullprivate val writeMetrics = context.taskMetrics().shuffleWriteMetrics}

5.2.write

原理:

  • 构建排序器
  • 将所有数据添加到排序器中;
    • 以PartitionedAppendOnlyMap或PartitionedPairBuffer形式将数据缓存在内存中;
    • 当内存中缓存数据达到溢写条件时,将缓存中的数据整个溢写到磁盘中,由一个临时文件保存;
  • 将排序器中数据输出到一个临时文件;
  • 构建一个临时文件的index文件;
  • 记录数据输出状态;

特别说明:

  • 临时文件中的数据是根据分区编号先后进行写入的;
  • 如果存在map端聚合,是现将数据进行聚合后,再写入临时文件的;
  • 调用一次SortShuffleWriter.write()函数,会生成一个磁盘输出文件和一个对应的index文件;
private[spark] class SortShuffleWriter[K, V, C](shuffleBlockResolver: IndexShuffleBlockResolver,handle: BaseShuffleHandle[K, V, C],mapId: Int,context: TaskContext)extends ShuffleWriter[K, V] with Logging {override def write(records: Iterator[Product2[K, V]]): Unit = {//构建排序器sorter = if (dep.mapSideCombine) {//map端聚合,需要定义排序器的聚合器、排序方式new ExternalSorter[K, V, C](context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)} else {// 在这种情况下,我们既没有向排序器传递聚合器,也没有向排序器传递排序器,因为我们不关心键是否在每个分区中排序;如果正在运行的操作是sortByKey,则将在reduce端执行new ExternalSorter[K, V, V](context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)}//数据添加到排序器:满足溢写条件情况下,缓存数据将会溢写到磁盘;否则,在内存中缓存sorter.insertAll(records)//获取输出数据文件val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)//创建输出文件的临时文件val tmp = Utils.tempFileWith(output)try {//组装blockIdval blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)//临时文件合并数据到数据文件:根据参数进行排序聚合val partitionLengths = sorter.writePartitionedFile(blockId, tmp)//创建数据文件的index文件shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)//记录数据输出状态mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)} finally {if (tmp.exists() && !tmp.delete()) {logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")}}}
}

5.2.1.ExternalSorter#insertAll-数据添加到ExternalSorter

数据缓存:

  • PartitionedAppendOnlyMap

    • 针对需要map端聚合的情况,使用此对象缓存数据;
    • 以(partitionId, key)为key,对value进行聚合;
    • 以((partitionId, key),聚合后的value)为一条数据
  • PartitionedPairBuffer
    • 针对非map端聚合的情况,使用此对象缓存数据;
    • 以(key,value)为一条数据;

步骤:

  • 遍历记录中的数据,根据是否map端聚合,逐条将数据缓存到集合中;

    • map端聚合,将数据缓存到PartitionedAppendOnlyMap,缓存的时候,根据(partitionId, key)为key对value进行合并;
    • 非map端聚合,直接以(key,value)形式将数据缓存到PartitionedPairBuffer中;
  • 每缓存一条记录,即判断一次是否需要将缓存中的数据溢写到磁盘

特别说明:

  • 如果没有发生缓存数据溢写到磁盘,数据将会以集合的方式缓存在内存中;
  • 记录数据在ExternalSorter中的保存形式:
    • 全部都以集合的形式缓存在内存中
    • 全部都以临时文件的形式落地在磁盘中,ExternalSorter中以spills属性维护对所有临时文件的引用;
    • 部分记录以集合的形式缓存在内存中,部分记录以临时文件的形式落地在磁盘;
private[spark] class ExternalSorter[K, V, C](context: TaskContext,aggregator: Option[Aggregator[K, V, C]] = None,partitioner: Option[Partitioner] = None,ordering: Option[Ordering[K]] = None,serializer: Serializer = SparkEnv.get.serializer)extends Spillable[WritablePartitionedPairCollection[K, C]](context.taskMemoryManager())with Logging {@volatile private var map = new PartitionedAppendOnlyMap[K, C]@volatile private var buffer = new PartitionedPairBuffer[K, C]def insertAll(records: Iterator[Product2[K, V]]): Unit = {// 聚合器部位None,则需要map端聚合val shouldCombine = aggregator.isDefined//map端预聚合,数据以PartitionedAppendOnlyMap形式缓存if (shouldCombine) {// Combine values in-memory first using our AppendOnlyMapval mergeValue = aggregator.get.mergeValueval createCombiner = aggregator.get.createCombinervar kv: Product2[K, V] = null//数据聚合:已有数据,就聚合;没有数据就新建一个数据;val update = (hadValue: Boolean, oldValue: C) => {if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)}//遍历记录数据while (records.hasNext) {//自上次溢写后,本次从记录数据中读取数据的数量+1addElementsRead()//从记录中读取一条数据kv = records.next()//数据根据(分区,key)进行聚合map.changeValue((getPartition(kv._1), kv._1), update)//判断缓存数据是否需要溢写到磁盘maybeSpillCollection(usingMap = true)}} else {// 非map端聚合,数据以PartitionedPairBuffer形式缓存while (records.hasNext) {addElementsRead()val kv = records.next()buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])maybeSpillCollection(usingMap = false)}}}
}

5.2.1.1.ExternalSorter#maybeSpillCollection-判断是否需要将缓存数据溢写到磁盘

数据溢写条件:

  • 缓存到集合总的数据量时32的倍数 && 缓存的数据字节数不小于集合阈值 && 缓存的数据字节数 > 扩充后的集合阈值;
  • 缓存到集合的的数据量大于Integer.MAX_VALUE(0x7fffffff)
  • 以上2个条件满足其一即发生溢写

溢写:

  • 底层的溢写逻辑由ExternalSorter#spill实现;
  • 溢写一次会生成一个溢写临时文件;
  • 添加到ExternalSorter的数据,其所有的溢写文件都维护在ExternalSorter#spills属性中;

集合的阈值:

  • 初始阈值

    • 初始阈值由spark.shuffle.spill.initialMemoryThreshold参数决定;
    • 默认5M;
  • 阈值扩充
    • 每次阈值扩充量:2 * 当前集合字节数 - 当前集合阈值;
    • 扩充时机:当前集合字节数 >= 当前集合阈值 时;

特别说明:

  • 缓存中的数据溢写到磁盘中后,用于缓存记录数据的集合将会重新构建;
private[spark] class ExternalSorter[K, V, C](context: TaskContext,aggregator: Option[Aggregator[K, V, C]] = None,partitioner: Option[Partitioner] = None,ordering: Option[Ordering[K]] = None,serializer: Serializer = SparkEnv.get.serializer)extends Spillable[WritablePartitionedPairCollection[K, C]](context.taskMemoryManager())with Logging {@volatile private var map = new PartitionedAppendOnlyMap[K, C]@volatile private var buffer = new PartitionedPairBuffer[K, C]//到目前为止观察到的内存中数据结构的峰值大小,以字节为单位private var _peakMemoryUsedBytes: Long = 0Ldef peakMemoryUsedBytes: Long = _peakMemoryUsedBytesprivate def maybeSpillCollection(usingMap: Boolean): Unit = {var estimatedSize = 0Lif (usingMap) {//map端聚合//估计集合的当前大小(以字节为单位)estimatedSize = map.estimateSize()//内存数据溢写到磁盘if (maybeSpill(map, estimatedSize)) {//发生数据溢写后,重新构建缓存集合map = new PartitionedAppendOnlyMap[K, C]}} else {//估计集合的当前大小(以字节为单位)estimatedSize = buffer.estimateSize()//内存数据溢写到磁盘if (maybeSpill(buffer, estimatedSize)) {//发生数据溢写后,重新构建缓存集合buffer = new PartitionedPairBuffer[K, C]}}//更新内存中数据结构的峰值大小if (estimatedSize > _peakMemoryUsedBytes) {_peakMemoryUsedBytes = estimatedSize}}
}private[spark] abstract class Spillable[C](taskMemoryManager: TaskMemoryManager)extends MemoryConsumer(taskMemoryManager) with Logging {// 自上次溢出后从输入中读取的元素数protected def elementsRead: Int = _elementsReadprivate[this] var _elementsRead = 0// 集合大小的初始阈值:默认5Mprivate[this] val initialMemoryThreshold: Long =SparkEnv.get.conf.getLong("spark.shuffle.spill.initialMemoryThreshold", 5 * 1024 * 1024)//集合的字节大小阈值//为了避免大量的小溢出,初始化该值为数量级> 0@volatile private[this] var myMemoryThreshold = initialMemoryThreshold//溢写的总字节数@volatile private[this] var _memoryBytesSpilled = 0L//发生溢写的次数private[this] var _spillCount = 0//如果需要,将当前内存中的收集信息溢出到磁盘。试图在溢出之前获取更多内存protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {var shouldSpill = false//读取数据量是32的倍数,且集合数据内存占用量不小于集合设置的阈值if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {//集合内存阈值扩充:2倍当前使用量 - 原有阈值val amountToRequest = 2 * currentMemory - myMemoryThresholdval granted = acquireMemory(amountToRequest)myMemoryThreshold += granted// 如果申请到的不够,map或buffer预估占用内存量还是大于阈值,确定溢写shouldSpill = currentMemory >= myMemoryThreshold}//如果上面判定不需要溢写,但读取的记录总数比Integer.MAX_VALUE大,也还是得溢写//numElementsForceSpillThreshold:Integer.MAX_VALUE   0x7fffffffshouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold//溢写实施if (shouldSpill) {//溢写次数+1_spillCount += 1logSpillage(currentMemory)//溢写spill(collection)//清空数据读取量_elementsRead = 0//累加溢写总字节数_memoryBytesSpilled += currentMemory//释放内存releaseMemory()}//返回溢写判断结果shouldSpill}
}

5.2.1.2.ExternalSorter#spill-缓存数据溢写到磁盘

步骤:

  • 对集合中的数据根据排序比较器进行排序,获取排序后数据迭代器
  • 排序后的数据溢写到磁盘,返回溢写文件;
    • 通过diskBlockManager创建一个临时文件
    • 通过blockManager创建临时文件写入器;
    • 遍历排序后的数据,将数据逐条添加到临时文件写入器,记录添加的数据量;
    • 每隔10000条,通过flush将写入器数据批量落地到临时文件;
    • 遍历结束后,将剩下的不足10000条的数据批量落地到临时文件;
    • 返回临时文件;
  • 溢写文件添加到临时溢写文件文件集合;

特别说明:

  • 集合中数据落地磁盘文件是一批一批的落地的,批处理数据量由spark.shuffle.spill.batchSize参数设置,默认10000;
private[spark] class ExternalSorter[K, V, C](context: TaskContext,aggregator: Option[Aggregator[K, V, C]] = None,partitioner: Option[Partitioner] = None,ordering: Option[Ordering[K]] = None,serializer: Serializer = SparkEnv.get.serializer)extends Spillable[WritablePartitionedPairCollection[K, C]](context.taskMemoryManager())with Logging {private val spills = new ArrayBuffer[SpilledFile]private val serInstance = serializer.newInstance()//文件缓存大小,默认32Kprivate val fileBufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024//批处理记录数量,默认10000private val serializerBatchSize = conf.getLong("spark.shuffle.spill.batchSize", 10000)//记录每个分区的数据量val elementsPerPartition = new Array[Long](numPartitions)  //集合数据溢写到磁盘override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {//对集合中的数据根据排序比较器进行排序,获取排序后数据迭代器//有排序比较器,对分区内数据根据key升序排序//没有排序比较器,根据分区进行升序排序val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)//数据溢写到磁盘val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)//添加到临时文件集合spills += spillFile}  //排序比较器:分区内对key进行升序排列    private val keyComparator: Comparator[K] = ordering.getOrElse(new Comparator[K] {override def compare(a: K, b: K): Int = {val h1 = if (a == null) 0 else a.hashCode()val h2 = if (b == null) 0 else b.hashCode()if (h1 < h2) -1 else if (h1 == h2) 0 else 1}})//获取排序比较器private def comparator: Option[Comparator[K]] = {if (ordering.isDefined || aggregator.isDefined) {Some(keyComparator)} else {None}}//数据溢写到磁盘private[this] def spillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator): SpilledFile = {// 因为溢写文件在shuffle过程中会被读取,因此它们的压缩不由spill相关参数控制// 创建一个临时块val (blockId, file) = diskBlockManager.createTempShuffleBlock()//记录每次溢写的数据量var objectsWritten: Long = 0val spillMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics//获取临时文件块磁盘写入器val writer: DiskBlockObjectWriter =blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics)// List of batch sizes (bytes) in the order they are written to diskval batchSizes = new ArrayBuffer[Long]// How many elements we have in each partitionval elementsPerPartition = new Array[Long](numPartitions)//文件flush,返回记录的偏移量、长度、临时文件def flush(): Unit = {//将磁盘写出器序列化流中数据flush到临时文件中val segment = writer.commitAndGet()batchSizes += segment.length_diskBytesSpilled += segment.lengthobjectsWritten = 0}var success = falsetry {//记录数据遍历while (inMemoryIterator.hasNext) {val partitionId = inMemoryIterator.nextPartition()require(partitionId >= 0 && partitionId < numPartitions,s"partition Id: ${partitionId} should be in the range [0, ${numPartitions})")//数据逐条添加到磁盘写出器序列化流中inMemoryIterator.writeNext(writer)//累加记录所附分区的数据量elementsPerPartition(partitionId) += 1//累加当次溢写的数据量objectsWritten += 1//每10000条数据落地到磁盘文件一次if (objectsWritten == serializerBatchSize) {flush()}}//遍历完毕,将剩余不足10000的数据的落地到磁盘if (objectsWritten > 0) {flush()} else {writer.revertPartialWritesAndClose()}success = true} finally {if (success) {writer.close()} else {// writer.revertPartialWritesAndClose()if (file.exists()) {if (!file.delete()) {logWarning(s"Error deleting ${file}")}}}}//返回溢写文件SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)}
}

5.2.2.ExternalSorter#writePartitionedFile-产生完整数据文件

功能:

  • 将溢写的临时文件和缓存中的数据合并,产生一个完成的数据文件;

详情:

  • 未发生过溢写

    • 将缓存中的数据根据分区id和key进行排序;
    • 将排序后的数据按照分区依次批量写入临时文件;(一个分区写一次)
  • 发生过溢写
    • 将磁盘中溢写文件的数据与内存中缓存的数据根据分区进行合流
    • 将合流后的数据按照分区依次批量写入临时文件;(一个分区写一次)
private[spark] class ExternalSorter[K, V, C](context: TaskContext,aggregator: Option[Aggregator[K, V, C]] = None,partitioner: Option[Partitioner] = None,ordering: Option[Ordering[K]] = None,serializer: Serializer = SparkEnv.get.serializer)extends Spillable[WritablePartitionedPairCollection[K, C]](context.taskMemoryManager())with Logging {def writePartitionedFile(blockId: BlockId,outputFile: File): Array[Long] = {// Track location of each range in the output fileval lengths = new Array[Long](numPartitions)//通过blockManager获取输出文件写入器val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,context.taskMetrics().shuffleWriteMetrics)if (spills.isEmpty) {//没有发生溢写// 确定数据在内存中的缓存形式val collection = if (aggregator.isDefined) map else buffer//数据排序:根据分区id和key排序val it = collection.destructiveSortedWritablePartitionedIterator(comparator)//排序后的数据遍历:将数据根据分区依次写入输出文件中while (it.hasNext) {val partitionId = it.nextPartition()//同一个分区的数据依次添加到写出器序列化流中:同一个分区的数据在一起while (it.hasNext && it.nextPartition() == partitionId) {it.writeNext(writer)}//序列化流中数据flush到文件,返回记录的偏移量、长度、临时文件val segment = writer.commitAndGet()//缓存分区与分区数据量lengths(partitionId) = segment.length}} else {//有发生数据溢写// 溢写文件和缓存数据合并,合并后再根据分区依次写入输出文件中for ((id, elements) <- this.partitionedIterator) {if (elements.hasNext) {//将分区中的数据依次添加到写出器序列化流中for (elem <- elements) {writer.write(elem._1, elem._2)}//序列化流中数据flush到文件,返回记录的偏移量、长度、临时文件val segment = writer.commitAndGet()//缓存分区与分区数据量lengths(id) = segment.length}}}writer.close()context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)lengths}
}

5.2.2.1.ExternalSorter#partitionedIterator-磁盘数据和内存数据合并

详情:

  • 读取磁盘临时文件和内存缓存中的数据;
  • 遍历分区,将磁盘临时文件和内存缓存中同一个分区的数据合并;
  • 对合并后的数据进行聚合排序操作
    • 定义了聚合器:跨分区执行部分聚合:根据聚合器定义聚合value,最后按照key排序;
    • 没有定义聚合器,但是定义了排序器:对数据根据排序器进行排序,而不是合并它们;
    • 没有定义聚合器和排序器:返回合并后的结果;
  • 返回分区与分区数据的映射集合
private[spark] class ExternalSorter[K, V, C](context: TaskContext,aggregator: Option[Aggregator[K, V, C]] = None,partitioner: Option[Partitioner] = None,ordering: Option[Ordering[K]] = None,serializer: Serializer = SparkEnv.get.serializer)extends Spillable[WritablePartitionedPairCollection[K, C]](context.taskMemoryManager())with Logging {def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {val usingMap = aggregator.isDefinedval collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else bufferif (spills.isEmpty) {//未发生溢写//未定义排序规则if (!ordering.isDefined) {// 只按分区ID排序,而不是keygroupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))} else {//定义排序规则// 根据分区ID和key进行排序groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(Some(keyComparator))))}} else {//发生溢写//合并溢出的和内存中的数据merge(spills, destructiveIterator(collection.partitionedDestructiveSortedIterator(comparator)))}}//数据合并private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)]): Iterator[(Int, Iterator[Product2[K, C]])] = {//从磁盘读取溢写的临时文件val readers = spills.map(new SpillReader(_))//获取内存中缓存的数据val inMemBuffered = inMemory.buffered//遍历分区(0 until numPartitions).iterator.map { p =>//获取内存中当前分区数据  val inMemIterator = new IteratorForPartition(p, inMemBuffered)//将磁盘文件数据和内存缓存数据合流val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator)if (aggregator.isDefined) {//定义了聚合器// 跨分区执行部分聚合:根据聚合器定义聚合value,最后按照key排序(p, mergeWithAggregation(iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))} else if (ordering.isDefined) {//没有聚合器,但是定义了排序器// 对元素进行排序,而不是合并它们(p, mergeSort(iterators, ordering.get))} else {//没有定义聚合器和排序器//返回合并后的结果(p, iterators.iterator.flatten)}}}
}

6.UnsafeShuffleWriter

6.1.实例化

说明:

  • 通过调用构造函数new一个UnsafeShuffleWriter对象完成实例化;
  • 使用UnsafeShuffleWriter要求stage分区数不能大于16777216
  • 初始化排序器,指定缓存初始化大小4096
  • 初始化系列化缓存,指定大小1M
public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {private static final Logger logger = LoggerFactory.getLogger(UnsafeShuffleWriter.class);private static final ClassTag<Object> OBJECT_CLASS_TAG;@VisibleForTestingstatic final int DEFAULT_INITIAL_SORT_BUFFER_SIZE = 4096;static final int DEFAULT_INITIAL_SER_BUFFER_SIZE = 1048576;private final BlockManager blockManager;private final IndexShuffleBlockResolver shuffleBlockResolver;private final TaskMemoryManager memoryManager;private final SerializerInstance serializer;private final Partitioner partitioner;private final ShuffleWriteMetrics writeMetrics;private final int shuffleId;private final int mapId;private final TaskContext taskContext;private final SparkConf sparkConf;private final boolean transferToEnabled;private final int initialSortBufferSize;private final int inputBufferSizeInBytes;private final int outputBufferSizeInBytes;@Nullableprivate MapStatus mapStatus;@Nullableprivate ShuffleExternalSorter sorter;private long peakMemoryUsedBytes = 0L;private UnsafeShuffleWriter.MyByteArrayOutputStream serBuffer;private SerializationStream serOutputStream;private boolean stopping = false;public UnsafeShuffleWriter(BlockManager blockManager, IndexShuffleBlockResolver shuffleBlockResolver, TaskMemoryManager memoryManager, SerializedShuffleHandle<K, V> handle, int mapId, TaskContext taskContext, SparkConf sparkConf) throws IOException {int numPartitions = handle.dependency().partitioner().numPartitions();//分区数不能大于16777216if (numPartitions > SortShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE()) {throw new IllegalArgumentException("UnsafeShuffleWriter can only be used for shuffles with at most " + SortShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE() + " reduce partitions");} else {this.blockManager = blockManager;this.shuffleBlockResolver = shuffleBlockResolver;this.memoryManager = memoryManager;this.mapId = mapId;ShuffleDependency<K, V, V> dep = handle.dependency();this.shuffleId = dep.shuffleId();this.serializer = dep.serializer().newInstance();this.partitioner = dep.partitioner();this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics();this.taskContext = taskContext;this.sparkConf = sparkConf;this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true);//排序器初始化缓存大小:4096this.initialSortBufferSize = sparkConf.getInt("spark.shuffle.sort.initialBufferSize", 4096);//输入缓存大小:默认32kthis.inputBufferSizeInBytes = (int)(Long)sparkConf.get(.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;//输出缓存大小:默认32kthis.outputBufferSizeInBytes = (int)(Long)sparkConf.get(.MODULE$.SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE()) * 1024;this.open();}}private void open() {assert this.sorter == null;//初始化排序器,指定缓存初始化大小4096this.sorter = new ShuffleExternalSorter(this.memoryManager, this.blockManager, this.taskContext, this.initialSortBufferSize, this.partitioner.numPartitions(), this.sparkConf, this.writeMetrics);//初始化系列化缓存,指定大小1Mthis.serBuffer = new UnsafeShuffleWriter.MyByteArrayOutputStream(1048576);//初始化序列化流this.serOutputStream = this.serializer.serializeStream(this.serBuffer);}
}

6.2.write

说明:

  • UnsafeShuffleWriter中提供2种重载的write函数,底层都是通过write(scala.collection.Iterator<Product2<K, V>> records)函数实现;
  • 首先将迭代器中数据逐条添加到排序器中;
    • 排序器中数据达到溢写条件,迭代器中数据将会溢写到一个临时文件中;
  • 其次将排序器中数据落地到一个输出文件中;
    • 会产生一个输出文件 + 一个输出文件对应的index文件;
  • 最后释放排序器中资源;
public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {public void write(Iterator<Product2<K, V>> records) throws IOException {//将java迭代器转为scala迭代器write(JavaConverters.asScalaIteratorConverter(records).asScala());}public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {boolean success = false;try {//遍历记录,逐条将记录添加到排序器中while (records.hasNext()) {insertRecordIntoSorter(records.next());}//合并临时文件为一个整体文件,然后上次临时文件closeAndWriteOutput();success = true;} finally {if (sorter != null) {try {//释放资源sorter.cleanupResources();} catch (Exception e) {// Only throw this error if we won't be masking another// error.if (success) {throw e;} else {logger.error("In addition to a failure during writing, we failed during " +"cleanup.", e);}}}}}
}

6.2.1.insertRecordIntoSorter-添加数据到排序器

说明:

  • 首先,将数据添加到序列化流中;

    • 序列化流中通过MyByteArrayOutputStream对象对数据进行缓存;默认缓存1M;
    • 序列化流中有计数器统计缓存中数据量;
  • 其次,将序列化流中数据添加到排序器;
public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {private ShuffleExternalSorter sorter;//序列化缓存:默认大小1Mprivate MyByteArrayOutputStream serBuffer;//序列化流private SerializationStream serOutputStream;void insertRecordIntoSorter(Product2<K, V> record) throws IOException {assert(sorter != null);final K key = record._1();final int partitionId = partitioner.getPartition(key);//重置序列化缓存计数器:重置为0serBuffer.reset();//将key、value写入序列化流中serOutputStream.writeKey(key, OBJECT_CLASS_TAG);serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);serOutputStream.flush();//确保序列化缓存中有数据final int serializedRecordSize = serBuffer.size();assert (serializedRecordSize > 0);//将序列化数据添加到排序器中sorter.insertRecord(serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);}
}

6.2.1.1.ShuffleExternalSorter#insertRecord

说明:

  • 1.对达到溢写条件的缓存数据,就将缓存中数据溢写到磁盘;
  • 2.对达到扩充容量要求的内存排序器缓存进行容量扩充;
  • 3.申请新的page存储数据;
  • 4.获取数据存储地址;
  • 5.将数据长度添加到page;
  • 6.将数据复制到page中;
  • 7.将数据存储地址添加到内存排序器中;

总结:

  • 数据存储在外部排序器的链表中,通过page(内存块MemoryBlock)作为链表元素存储数据;
  • 数据地址和分区id存储在内存排序器的缓存中;
  • 数据溢写条件:数据量超过Interger.MAX_VALUE;
  • 一次溢写产生一个临时文件;
final class ShuffleExternalSorter extends MemoryConsumer {public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId) throws IOException {assert this.inMemSorter != null;//内存排序器中数据量 >= Integer.MAX_VALUEif (this.inMemSorter.numRecords() >= this.numElementsForSpillThreshold) {logger.info("Spilling data because number of spilledRecords crossed the threshold " + this.numElementsForSpillThreshold);//数据溢写到磁盘this.spill();}//内存排序器中存储数据的缓存扩容this.growPointerArrayIfNecessary();//数据对其,确定数据长度int uaoSize = UnsafeAlignedOffset.getUaoSize();int required = length + uaoSize;//申请新的page存储数据this.acquireNewPageIfNecessary(required);assert this.currentPage != null;//获取当前page中存储数据的对象Object base = this.currentPage.getBaseObject();//获取数据存储地址long recordAddress = this.taskMemoryManager.encodePageNumberAndOffset(this.currentPage, this.pageCursor);//将数据长度添加到pageUnsafeAlignedOffset.putSize(base, this.pageCursor, length);// 调整下page指针的内存地址this.pageCursor += (long)uaoSize;// 再将数据复制到page中Platform.copyMemory(recordBase, recordOffset, base, this.pageCursor, (long)length);// 调整下page光标位置this.pageCursor += (long)length;// 将数据在page中的存储地址和分区id记录到内存排序器中this.inMemSorter.insertRecord(recordAddress, partitionId);}
}
6.2.1.1.1.ShuffleExternalSorter#spill-缓存数据溢写磁盘

说明:

  • 判断溢写条件是否达成;
  • 将数据溢写到临时文件,一次溢写产生一个临时文件;
    • 数据排序时通过对内存排序器缓存的数据地址根据分区id升序排列完成的;
    • 临时文件中的数据一个分区时在一起的;
  • 释放内存资源;
  • 重置内存排序器;
final class ShuffleExternalSorter extends MemoryConsumer {//从父类MemoryConsumer继承过来的public void spill() throws IOException {this.spill(9223372036854775807L, this);}public long spill(long size, MemoryConsumer trigger) throws IOException {//要求内存排序器缓存数据量 > 0if (trigger == this && this.inMemSorter != null && this.inMemSorter.numRecords() != 0) {logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)", new Object[]{Thread.currentThread().getId(), Utils.bytesToString(this.getMemoryUsage()), this.spills.size(), this.spills.size() > 1 ? " times" : " time"});//排序并写入临时文件this.writeSortedFile(false);//释放内存资源long spillSize = this.freeMemory();//重置内存排序器this.inMemSorter.reset();//上报溢出切片文件大小this.taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);return spillSize;} else {return 0L;}}
}
6.2.1.1.1.1.writeSortedFile-数据溢写到临时文件

说明:

  • 对内存排序器缓存的数据地址,根据地址中的分区id,使用RadixSort方法进行升序排列;

    • 分区id存储在每天数据地址的5~7这3个字节中;
    • 3个字节,一个字节8位,可以存2的24(3*8=24)次方个分区id;
  • 从blockManager中获取溢写临时文件信息;
  • 从blockManager中获取磁盘写出器;
  • 遍历数据地址,将每个数据地址对应的数据写入临时文件;
    • 从数据地址中获取数据分区id;
    • 将上个分区的数据flush到临时文件,并记录分区数据量到溢写信息对象中;
    • 从外部排序器的数据链表中,根据数据地址,找到存储数据的page;
    • 遍历page中的数据,将数据添加到写出器的序列化流中;
      • 每次从page中读取缓存大小的数据写入写出器缓存;
      • 将写出器缓存数据添加到写出器序列化流中;
      • 读取器缓存大小由spark.shuffle.spill.diskWriteBufferSize参数控制;默认1M;
    • 数据遍历结束,通知写入器已经向序列化流中写入了一整条条数据
  • 数据遍历结束,将数据从序列化流中flush到临时文件中;
  • 关闭写出器;
  • 将分区数据量缓存到溢写信息对象中;并将溢写信息对象添加到溢写信息地下链表中;
  • 上报溢写指标;

总结:

  • 排序通过对内存排序器缓存器中数据地址的排序实现的;

  • 根据排序后的数据地址,从外部排序器数据的链表缓存器中确定存储数据的page;

  • 根据排序后的数据地址依次将数据写出到临时文件;

  • 内存排序器数据地址排序时根据分区id升序排列的,所以临时文件中一个分区的数据是在一起的;

  • 对每条数据写出到临时文件,现将数据读取到写出去缓存(默认1M),然后将缓存添加到序列化流;最后一个分区的数据一起flush到临时文件;

final class ShuffleExternalSorter extends MemoryConsumer {private void writeSortedFile(boolean isLastFile) {ShuffleWriteMetrics writeMetricsToUse;if (isLastFile) {//非切分文件//上报指标writeMetricsToUse = this.writeMetrics;} else {//切分文件//构建指标统计器writeMetricsToUse = new ShuffleWriteMetrics();}//数据排序ShuffleSorterIterator sortedRecords = this.inMemSorter.getSortedIterator();//构建磁盘写出缓存:通过spark.shuffle.spill.diskWriteBufferSize控制大小byte[] writeBuffer = new byte[this.diskWriteBufferSize];//从blockManager获取临时溢写文件信息Tuple2<TempShuffleBlockId, File> spilledFileInfo = this.blockManager.diskBlockManager().createTempShuffleBlock();File file = (File)spilledFileInfo._2();TempShuffleBlockId blockId = (TempShuffleBlockId)spilledFileInfo._1();//构建溢写信息对象SpillInfo spillInfo = new SpillInfo(this.numPartitions, file, blockId);SerializerInstance ser = DummySerializerInstance.INSTANCE;//从blockManager中获取磁盘写出器DiskBlockObjectWriter writer = this.blockManager.getDiskWriter(blockId, file, ser, this.fileBufferSizeBytes, writeMetricsToUse);//初始化当前分区idint currentPartition = -1;int uaoSize = UnsafeAlignedOffset.getUaoSize();//遍历数据存储地址while(sortedRecords.hasNext()) {//将地址数据加载到packedRecordPointer中sortedRecords.loadNext();//从地址数据中获取对应分区idint partition = sortedRecords.packedRecordPointer.getPartitionId();assert partition >= currentPartition;//分区号不同:代表上一个分区数据已经全部添加到写出去序列化流中if (partition != currentPartition) {//currentPartition != -1:不是处理第一个分区的数据if (currentPartition != -1) {//将上一个分区的数据从序列化流中flush到临时文件中FileSegment fileSegment = writer.commitAndGet();//将分区数据量缓存到溢写信息对象中spillInfo.partitionLengths[currentPartition] = fileSegment.length();}//更新当前分区idcurrentPartition = partition;}//根据内存排序器缓存的数据地址找到外部排序器通过链表缓存的pagelong recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();Object recordPage = this.taskMemoryManager.getPage(recordPointer);//通过数据在page中的偏移量,计算数据长度long recordOffsetInPage = this.taskMemoryManager.getOffsetInPage(recordPointer);int dataRemaining = UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage);//遍历数据int toTransfer;for(long recordReadPosition = recordOffsetInPage + (long)uaoSize; dataRemaining > 0; dataRemaining -= toTransfer) {//计算每次处理的数据量:在磁盘写出缓存大小和数据大小见取小toTransfer = Math.min(this.diskWriteBufferSize, dataRemaining);//根据确定的处理数据量,从page中将数据赋值到写出缓存器中Platform.copyMemory(recordPage, recordReadPosition, writeBuffer, (long)Platform.BYTE_ARRAY_OFFSET, (long)toTransfer);//将写出缓冲器中数据添加到写出器的序列化流中writer.write(writeBuffer, 0, toTransfer);//数据读取偏移量更新recordReadPosition += (long)toTransfer;}//通知写入器已经向序列化流中写入了一条数据writer.recordWritten();}//数据遍历结束,将数据从序列化流中flush到临时文件中FileSegment committedSegment = writer.commitAndGet();//关闭写出器writer.close();if (currentPartition != -1) {//将分区数据量缓存到溢写信息对象中spillInfo.partitionLengths[currentPartition] = committedSegment.length();//将本次溢写信息对象添加到溢写信息对象链表中this.spills.add(spillInfo);}if (!isLastFile) {//上报指标this.writeMetrics.incRecordsWritten(writeMetricsToUse.recordsWritten());this.taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.bytesWritten());}}
}
6.2.1.1.1.2.溢写数据排序

说明:

  • 对内存排序器中缓存的数据地址进行排序;
  • 根据分区ID对内存排序器缓存数据排序;
final class ShuffleInMemorySorter {public ShuffleInMemorySorter.ShuffleSorterIterator getSortedIterator() {int offset = 0;//默认基数排序if (this.useRadixSort) {//分区id存储在longArray的5~7这3个字节中;最多可以存2的24次方个分区id;offset = RadixSort.sort(this.array, (long)this.pos, 5, 7, false, false);} else {//TimSort排序MemoryBlock unused = new MemoryBlock(this.array.getBaseObject(), this.array.getBaseOffset() + (long)this.pos * 8L, (this.array.size() - (long)this.pos) * 8L);LongArray buffer = new LongArray(unused);Sorter<PackedRecordPointer, LongArray> sorter = new Sorter(new ShuffleSortDataFormat(buffer));sorter.sort(this.array, 0, this.pos, SORT_COMPARATOR);}return new ShuffleInMemorySorter.ShuffleSorterIterator(this.pos, this.array, offset);}
}
6.2.1.1.2.growPointerArrayIfNecessary-内存排序器缓存扩容

说明:

  • 首先,判断内存排序器缓存是否达到扩容条件;
  • 然后,针对达到条件的情况,进行扩容;
    • 计算当前缓存容量;
    • 按照当前缓存容量的2倍进行扩容,构建一个新的数组缓存器;
    • 再次判断缓存资源是否够用;
      • 够用,释放新构建的数组缓存器资源;
      • 不够用,将新的数组缓存器替换为内存排序器的缓存;
final class ShuffleExternalSorter extends MemoryConsumer {private ShuffleInMemorySorter inMemSorter;private void growPointerArrayIfNecessary() throws IOException {assert this.inMemSorter != null;//判断是否达到扩容条件if (!this.inMemSorter.hasSpaceForAnotherRecord()) {//计算当前缓存容量long used = this.inMemSorter.getMemoryUsage();//扩容LongArray array;try {array = this.allocateArray(used / 8L * 2L);} catch (TooLargePageException var5) {this.spill();return;} catch (SparkOutOfMemoryError var6) {if (!this.inMemSorter.hasSpaceForAnotherRecord()) {logger.error("Unable to grow the pointer array");throw var6;}return;}//再次判断缓存是否够用:可能其他task释放了资源,从而缓存够用if (this.inMemSorter.hasSpaceForAnotherRecord()) {//这种情况下,释放刚申请的page资源this.freeArray(array);} else {//资源还是不够用,使用刚申请的page资源this.inMemSorter.expandPointerArray(array);}}}
}
6.2.1.1.2.1.扩容条件判断

说明:

  • 当缓存最新数据的索引 < 缓存可用容量时,缓存不需要扩容;

    • 反之:当当缓存最新数据的索引 >= 缓存可用容量时,需要扩容;
  • 针对缓存可用容量
    • 默认可用容量为总容量的一半;
    • 如果使用非useRadixSort方案,可用容量为总容量的2/3;
  • 针对缓存
    • 使用LongArray作为缓存器;
final class ShuffleInMemorySorter {//缓存数据的数组private LongArray array;//记录数组中最新数据的索引private int pos = 0;//定义数组可使用容量private int usableCapacity = 0;ShuffleInMemorySorter(MemoryConsumer consumer, int initialSize, boolean useRadixSort) {//---------其他代码---------this.usableCapacity = this.getUsableCapacity();}//最新数据索引 < 可用容量:缓存不需要扩容public boolean hasSpaceForAnotherRecord() {return this.pos < this.usableCapacity;}//默认可用容量为总容量的一半;//如果使用非useRadixSort方案,可用容量为总容量的2/3private int getUsableCapacity() {return (int)((double)this.array.size() / (this.useRadixSort ? 2.0D : 1.5D));}}
6.2.1.1.2.2.内存使用量计算

说明:

  • 数组page大小即为内存使用量,单位byte;
final class ShuffleInMemorySorter {//返回数组page大小public long getMemoryUsage() {return this.array.size() * 8L;}
}public final class LongArray {//返回数组page大小public long size() {return this.length;}
}
6.2.1.1.2.3.allocateArray-扩容

说明:

  • 根据新的容量申请新的page(MemoryBlock)
  • 根据page构建新的数组
public abstract class MemoryConsumer {public LongArray allocateArray(long size) {long required = size * 8L;//根据新的容量申请新的page(MemoryBlock)MemoryBlock page = this.taskMemoryManager.allocatePage(required, this);if (page == null || page.size() < required) {this.throwOom(page, required);}//更新内存使用量this.used += required;//根据page构建新的数组return new LongArray(page);}
}
6.2.1.1.2.4.expandPointerArray-数据转移

说明:

  • 首先,确保新缓存容量比旧缓存容量大;
  • 其次,将数据从旧数组复制到新数组;
  • 然后,释放旧数组资源;
  • 接着,新数组作为内存排序器缓存;
  • 最后,更新内存排序器可用容量;
final class ShuffleInMemorySorter {public void expandPointerArray(LongArray newArray) {//确保新缓存容量比原来缓存容量大assert newArray.size() > this.array.size();//数据从旧数组复制到新数组Platform.copyMemory(this.array.getBaseObject(), this.array.getBaseOffset(), newArray.getBaseObject(), newArray.getBaseOffset(), (long)this.pos * 8L);//释放旧数组资源this.consumer.freeArray(this.array);//新数组作为内存排序器缓存this.array = newArray;//更新内存排序器可用容量this.usableCapacity = this.getUsableCapacity();}
}
6.2.1.1.3.acquireNewPageIfNecessary-申请新的page存储数据

构建新page条件:

  • 当前page为null;
  • 当前page不够用;
  • 以上二者存在一种就需要构建新page;

步骤:

  • 根据数据长度构建新page作为当前page;
  • 更新page光标位置;
  • 新构建page加入page列表;
final class ShuffleExternalSorter extends MemoryConsumer {private void acquireNewPageIfNecessary(int required) {//当前page为null,或者当前page不够用if (this.currentPage == null || this.pageCursor + (long)required > this.currentPage.getBaseOffset() + this.currentPage.size()) {//根据数据长度构建新page作为当前pagethis.currentPage = this.allocatePage((long)required);//更新page光标位置this.pageCursor = this.currentPage.getBaseOffset();//新构建page加入page列表this.allocatedPages.add(this.currentPage);}}
}

6.2.2.closeAndWriteOutput-外部排序器缓存数据落地到磁盘

说明:

  • 更新内存使用峰值;
  • 将缓存数据落地到磁盘临时文件;
  • 合并临时文件(可能多个:一次溢写一个)为一个输出文件;
    • 合并后的输出文件,数据根据分区id升序排列,一个分区的数据在一块;
  • 创建输出文件对应index文件;
    • 存储每个分区的数据偏移量;
    • 数据偏移量顺序和输出的数据文件一致,一一对应;
  • 记录shuffle状态信息;
public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {void closeAndWriteOutput() throws IOException {assert(sorter != null);// 更新内存使用峰值updatePeakMemoryUsed();serBuffer = null;serOutputStream = null;//将缓存中的数据落地到磁盘临时文件中final SpillInfo[] spills = sorter.closeAndGetSpills();sorter = null;final long[] partitionLengths;//获取输出数据文件final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);//构建输出数据文件的临时文件final File tmp = Utils.tempFileWith(output);try {try {//合并临时文件partitionLengths = mergeSpills(spills, tmp);} finally {for (SpillInfo spill : spills) {if (spill.file.exists() && ! spill.file.delete()) {logger.error("Error while deleting spill file {}", spill.file.getPath());}}}//创建输出文件对应的index文件shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);} finally {if (tmp.exists() && !tmp.delete()) {logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());}}//记录shuffle状态信息mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);}
}

6.2.2.1.closeAndGetSpills-强制flush缓存到磁盘中

说明:

  • 根据内存排序器缓存的数据地址,将地址对应的数据落地到临时文件中;
  • 释放内存资源和内存排序器;
  • 返回所有的溢写信息对象数组;
    • 溢写信息对象中存储了每个分区对应的数据量;
final class ShuffleExternalSorter extends MemoryConsumer {public SpillInfo[] closeAndGetSpills() throws IOException {if (this.inMemSorter != null) {//将缓存中的数据溢写到磁盘this.writeSortedFile(true);//释放内存资源this.freeMemory();//释放内存排序器this.inMemSorter.free();this.inMemSorter = null;}//返回溢写信息对象数组return (SpillInfo[])this.spills.toArray(new SpillInfo[this.spills.size()]);}
}

6.2.2.2.mergeSpills-合并临时文件

说明:

  • 没有临时文件,创建一个空输出文件;
  • 由一个临时文件,将临时文件迁移并重命名为输出文件;
  • 由多个临时文件,执行文件合并;
    • 快合并

      • 要求开启快速合并且支持快速合并;

        • 通过spark.file.transferTo判断判断是否基于传输快速合并,默认是;
        • 否则基于文件流快速合并;
    • 慢合并
      • 如果不是快速合并,则采取慢合并;
  • 返回每个分区的数据量数组;
public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOException {//是否压缩:默认是,通过spark.shuffle.compress设置boolean compressionEnabled = this.sparkConf.getBoolean("spark.shuffle.compress", true);//压缩方式编码:默认LZ4CompressionCodec,通过spark.io.compression.codec设置CompressionCodec compressionCodec = org.apache.spark.io.CompressionCodec..MODULE$.createCodec(this.sparkConf);//是否启用fast merge:默认是boolean fastMergeEnabled = this.sparkConf.getBoolean("spark.shuffle.unsafe.fastMergeEnabled", true);//是否支持 fast merge://1、不启用压缩算法//2、或者SnappyCompressionCodec、LZFCompressionCodec、LZ4CompressionCodec、ZStdCompressionCodec这4种压缩算法之一boolean fastMergeIsSupported = !compressionEnabled || org.apache.spark.io.CompressionCodec..MODULE$.supportsConcatenationOfSerializedStreams(compressionCodec);//是否启用加密:默认否,通过 spark.io.encryption.enabled 参数来设置boolean encryptionEnabled = this.blockManager.serializerManager().encryptionEnabled();try {if (spills.length == 0) {//没有溢写分解// 创建一个空文件(new FileOutputStream(outputFile)).close();//返回全是0的分区数据量数组return new long[this.partitioner.numPartitions()];} else if (spills.length == 1) {//一个溢写文件//临时文件迁移并重名为输出文件Files.move(spills[0].file, outputFile);//返回分区数据量数组return spills[0].partitionLengths;} else {//多个溢写文件long[] partitionLengths;//fast mergeif (fastMergeEnabled && fastMergeIsSupported) {//基于传输&&不加密方式快速合并:默认方式;if (this.transferToEnabled && !encryptionEnabled) {//基于传输的合并logger.debug("Using transferTo-based fast merge");partitionLengths = this.mergeSpillsWithTransferTo(spills, outputFile);} else {//基于文件流的合并logger.debug("Using fileStream-based fast merge");partitionLengths = this.mergeSpillsWithFileStream(spills, outputFile, (CompressionCodec)null);}} else {//慢合并logger.debug("Using slow merge");partitionLengths = this.mergeSpillsWithFileStream(spills, outputFile, compressionCodec);}//写出指标统计this.writeMetrics.decBytesWritten(spills[spills.length - 1].file.length());this.writeMetrics.incBytesWritten(outputFile.length());//返回分区数据量数组return partitionLengths;}} catch (IOException var9) {if (outputFile.exists() && !outputFile.delete()) {logger.error("Unable to delete output file {}", outputFile.getPath());}throw var9;}}
}

6.3.排序器

6.3.1.ShuffleExternalSorter-外部排序器

说明:

  • ShuffleExternalSorterMemoryConsumer的子类;
  • 在构造ShuffleExternalSorter实例化对象时,会构造一个MemoryConsumer实例化对象;
  • 指定文件缓存大小:默认32k;
  • 指定溢写阈值:Integer.MAX;
  • 磁盘写出缓冲区:默认1M;
  • 构建一个内存排序器,并维护在当前排序器中;
    • 指定缓存初始化大小4096;
    • 默认根据useRadixSort排序;

总结:

  • 在外部排序器中,通过链表存储数据;

    • 链表中的元素为page,实际上是内存块MemoryBlock;
    • 所有的数据都存在一个个page中;
  • 通过currentPage指向最新的page,当前page;
  • 通过pageCursor(光标)指向page中数据的偏移量;
final class ShuffleExternalSorter extends MemoryConsumer {private static final Logger logger = LoggerFactory.getLogger(ShuffleExternalSorter.class);@VisibleForTestingstatic final int DISK_WRITE_BUFFER_SIZE = 1048576;private final int numPartitions;private final TaskMemoryManager taskMemoryManager;private final BlockManager blockManager;private final TaskContext taskContext;private final ShuffleWriteMetrics writeMetrics;//溢写阈值private final int numElementsForSpillThreshold;//文件缓冲区private final int fileBufferSizeBytes;//磁盘写出缓冲区private final int diskWriteBufferSize;//存储数据的page最多可以使用2^13个页表private final LinkedList<MemoryBlock> allocatedPages = new LinkedList();//溢出文件的元数据信息的列表private final LinkedList<SpillInfo> spills = new LinkedList();private long peakMemoryUsedBytes;@Nullableprivate ShuffleInMemorySorter inMemSorter;//当前使用的page@Nullableprivate MemoryBlock currentPage = null;//Page的光标private long pageCursor = -1L;//构造函数ShuffleExternalSorter(TaskMemoryManager memoryManager, BlockManager blockManager, TaskContext taskContext, int initialSize, int numPartitions, SparkConf conf, ShuffleWriteMetrics writeMetrics) {//构造一个MemoryConsumer实例化对象super(memoryManager, (long)((int)Math.min(134217728L, memoryManager.pageSizeBytes())), memoryManager.getTungstenMemoryMode());this.taskMemoryManager = memoryManager;this.blockManager = blockManager;this.taskContext = taskContext;//确定reduce的分区数目this.numPartitions = numPartitions;//指定文件缓存大小:默认32kthis.fileBufferSizeBytes = (int)(Long)conf.get(.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;//指定溢写阈值:Integer.MAXthis.numElementsForSpillThreshold = (Integer)conf.get(.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD());this.writeMetrics = writeMetrics;//构建一个内存排序器,并维护在当前排序器中this.inMemSorter = new ShuffleInMemorySorter(this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", true));//内存使用情况this.peakMemoryUsedBytes = this.getMemoryUsage();//磁盘写出缓冲区:默认1Mthis.diskWriteBufferSize = (int)(Long)conf.get(.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE());}

6.3.1.1.ShuffleInMemorySorter-内存排序器

说明:

  • 使用LongArray作为内存缓存形式,对数据进行缓存;
  • array初始化容量为4096;
  • 可用容量根据排序方案确定:
    • 针对useRadixSort排序,可用容量时总容量的一半;
    • 针对非useRadixSort排序,可用容量时总容量的2/3;
  • 默认比较器:根据分区ID升序排列;

总结:

  • 内存排序器通过LongArray对象缓存数据存储地址;

    • LongArray对象底层通过内存块MemoryBlock存储数据;
final class ShuffleInMemorySorter {//初始化排序规则:默认根据分区Id升序排列private static final ShuffleInMemorySorter.SortComparator SORT_COMPARATOR = new ShuffleInMemorySorter.SortComparator();private final MemoryConsumer consumer;private LongArray array;private final boolean useRadixSort;private int pos = 0;private int usableCapacity = 0;private final int initialSize;ShuffleInMemorySorter(MemoryConsumer consumer, int initialSize, boolean useRadixSort) {this.consumer = consumer;assert initialSize > 0;//缓存初始化大小4096this.initialSize = initialSize;//默认根据useRadixSort排序this.useRadixSort = useRadixSort;//构建一个4096大小的LongArray作为排序器数据缓存this.array = consumer.allocateArray((long)initialSize);//可用容量this.usableCapacity = this.getUsableCapacity();}//内部类:排序比较器private static final class SortComparator implements Comparator<PackedRecordPointer> {private SortComparator() {}//默认根据分区id进行升序排列public int compare(PackedRecordPointer left, PackedRecordPointer right) {int leftId = left.getPartitionId();int rightId = right.getPartitionId();return leftId < rightId ? -1 : (leftId > rightId ? 1 : 0);}}
}

6.3.1.1.1.可用容量计算

说明:

  • 针对useRadixSort排序,可用容量时总容量的一半;
  • 针对非useRadixSort排序,可用容量时总容量的2/3;
final class ShuffleInMemorySorter {private static final ShuffleInMemorySorter.SortComparator SORT_COMPARATOR = new ShuffleInMemorySorter.SortComparator();private final MemoryConsumer consumer;private LongArray array;private final boolean useRadixSort;private int pos = 0;private int usableCapacity = 0;private final int initialSize;//针对根据useRadixSort排序,可用容量时总容量的一半;否则,可用容量是总容量的2/3;private int getUsableCapacity() {return (int)((double)this.array.size() / (this.useRadixSort ? 2.0D : 1.5D));}
}

6.4. 总结

适用场景:

  • 序列化器支持对象迁移:支持序列化重定向;
  • 非map端聚合
  • 分区数不大于16777216

写出流程:

  • 首先将迭代器中数据逐条添加到排序器中;

    • 排序器中数据达到溢写条件,迭代器中数据将会溢写到一个临时文件中;
  • 其次将排序器中数据落地到一个输出文件中;
    • 会产生一个输出文件 + 一个输出文件对应的index文件;
  • 最后释放排序器中资源;

排序器:

  • 外部排序器

    • 缓存数据
    • 通过以page(内存块memeryBlock)为元素的链表实现数据缓存;
  • 内存排序器
    • 缓存数据地址

      • 数据地址的5~7折3个字节存储分区id;
      • 总共可以存储2的24次方(16777216)个分区编号;
    • 通过LongArry(内部以memeryBlock存储数据)实现数据缓存;
  • 排序的实现
    • 通过对内存排序器缓存的数据地址根据分区id以RadixSort方式升序排序实现数据的排序;
  • 溢写
    • 内存缓存器中的数据地址数据达到Integer.MAX_VALUE,即产生一次数据溢写;
    • 一次数据溢写,参数一个临时文件;

产出:

  • 一次write,产生一个数据输出文件 + 一个index文件
  • 输出文件
    • 文件中数据根据分区id升序排列;
    • 一个分区的数据在一块;
  • index文件
    • 文件中存储数据文件中每个分区的数据偏移量;
    • 文件中数据和数据文件中分区一一对应;

7.创建数据文件对应index文件

说明:

  • index文件中记录每个分区数据偏移量;
  • index文件中记录记录的每个数据偏移量与数据文件中每个分区的对应;2个文件中分区顺序一致,都是升序排列;
private[spark] class IndexShuffleBlockResolver(conf: SparkConf,_blockManager: BlockManager = null)extends ShuffleBlockResolverwith Logging {def writeIndexFileAndCommit(shuffleId: Int,mapId: Int,lengths: Array[Long],dataTmp: File): Unit = {//创建index文件val indexFile = getIndexFile(shuffleId, mapId)val indexTmp = Utils.tempFileWith(indexFile)try {//获取数据文件val dataFile = getDataFile(shuffleId, mapId)//每个执行器只有一个IndexShuffleBlockResolver,这个同步确保下面的检查和重命名是原子的.synchronized {//检查索引文件和数据文件val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)if (existingLengths != null) {System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)//如果相关的index已经存在, 就可以直接退出了, 这是因为这个mapTask可能已经运行过了. // 当然也可能因为其它原因失败, 但总之这次写是不成功的, 直接删除tmp文件完事if (dataTmp != null && dataTmp.exists()) {dataTmp.delete()}} else {// 创建面向index临时文件的数据输出流val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))Utils.tryWithSafeFinally {// We take in lengths of each block, need to convert it to offsets.var offset = 0Lout.writeLong(offset)//遍历分区数据量数组for (length <- lengths) {//更新每个分区数据偏移量offset += length//将分区数据偏移量依次写入index文件out.writeLong(offset)}} {out.close()}//索引文件删除if (indexFile.exists()) {indexFile.delete()}//数据文件删除if (dataFile.exists()) {dataFile.delete()}//将索引临时文件改名为索引文件if (!indexTmp.renameTo(indexFile)) {throw new IOException("fail to rename file " + indexTmp + " to " + indexFile)}if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {throw new IOException("fail to rename file " + dataTmp + " to " + dataFile)}}}} finally {if (indexTmp.exists() && !indexTmp.delete()) {logError(s"Failed to delete temporary index file at ${indexTmp.getAbsolutePath}")}}}
}

8.总结

整体逻辑:

  • 构建RDD依赖时,如果是宽依赖,会初始化宽依赖的shuffleHandle属性

    • 此时会向shuffleManager注册handle时,根据不同情况实例化不同的ShuffleHandle对象
  • 在Executor执行任务时,针对shuffle任务,会将任务执行的结果数据通过ShuffleWriter落地到磁盘

    • 从ShuffleManager中,根据RDD依赖中的shuffleHandle属性值(ShuffleHandle对象)的不同类型,实例化获取不同的ShuffleWriter类型对象
    • 落地到磁盘时调用ShuffleWriter.write()函数实现的
    • 执行一次shuffle任务,落地到磁盘会生成一个数据文件 + 一个index文件

ShuffleWriter类型的分析:

  • BypassMergeSortShuffleWriter

    • handle:BypassMergeSortShuffleHandle
    • 适用:不是map端聚合且分区数不高于200
    • 效果:直接写入numPartitions文件,并在最后将它们连接起来
    • 优势:避免了进行两次序列化和反序列化以合并溢出的文件
    • 缺点:一次打开多个文件,从而为缓冲区分配更多内存;
  • SortShuffleWriter
    • handle:BaseShuffleHandle
    • 适用:前面2中不适用的;
    • 效果:以反序列化的形式缓冲映射输出
    • 特点:支持map端聚合、支持排序
  • UnsafeShuffleWriter
    • handle:SerializedShuffleHandle
    • 适用
      • 序列化器支持对象迁移:持序列化重定向;
      • 非map端聚合
      • 分区数不大于16777216
    • 效果:以序列化的形式缓冲映射输出、支持排序

9.参考资料

源码解析Spark各个ShuffleWriter的实现机制(四)——UnsafeShuffleWriter

Spark Shuffle参数调优的原理与建议

Spark基本sort shuffle write流程解析

spark源码-shuffle原理分析-1-ShuffleWriter相关推荐

  1. SpringCloud微服务注册中心如何承载大型系统的千万级访问?源码及原理分析

    2019独角兽企业重金招聘Python工程师标准>>> 问题起源     Spring Cloud架构体系中,Eureka是一个至关重要的组件,它扮演着微服务注册中心的角色,所有的服 ...

  2. u-boot源码配置原理分析

    作者:杨老师,华清远见嵌入式学院讲师. u-boot的源代码默认是不针对任何目标平台的,当我们要移植u-boot到一个特定的目标平台时,需要生成针对目标平台的配置文件.u-boot目前已经支持的芯片可 ...

  3. Spark源码解读之Shuffle原理剖析与源码分析

    在前面几篇文章中,介绍了Spark的启动流程Spark内核架构流程深度剖析,Spark源码分析之DAGScheduler详解,Spark源码解读之Executor以及Task工作原理剖析,Spark源 ...

  4. Spark源码分析之Sort-Based Shuffle读写流程

    一 概述 我们知道Spark Shuffle机制总共有三种: # 未优化的Hash Shuffle:每一个ShuffleMapTask都会为每一个ReducerTask创建一个单独的文件,总的文件数是 ...

  5. Spark源码解读之Shuffle计算引擎剖析

    Shuffle是Spark计算引擎的关键所在,是必须经历的一个阶段,在前面的文章中,我们剖析了Shuffle的原理以及Map阶段结果的输出与Reduce阶段结果如何读取.该篇文章是对前面两篇文章 [S ...

  6. Spark源码分析之九:内存管理模型

    Spark是现在很流行的一个基于内存的分布式计算框架,既然是基于内存,那么自然而然的,内存的管理就是Spark存储管理的重中之重了.那么,Spark究竟采用什么样的内存管理模型呢?本文就为大家揭开Sp ...

  7. Spark 源码分析之ShuffleMapTask内存数据Spill和合并

    Spark 源码分析之ShuffleMapTask内存数据Spill和合并 更多资源分享 SPARK 源码分析技术分享(视频汇总套装视频): https://www.bilibili.com/vide ...

  8. Spark源码分析之七:Task运行(一)

    在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在T ...

  9. spark 源码分析 Blockmanager

    原文链接 参考, Spark源码分析之-Storage模块 对于storage, 为何Spark需要storage模块?为了cache RDD  Spark的特点就是可以将RDD cache在memo ...

最新文章

  1. Cygwin运行nutch报错:Failed to set permissions of path
  2. python邮件正文表格怎么编辑_python怎么把excel写到邮件里
  3. php curl向另一个页面post,一个PHP CURL的POST提交遇到的问题
  4. c语言操作空间怎么打开_学好C语言,离大神更近一步,C环境的安装
  5. LFDMM源码剖析(融入词向量的概率图模型)
  6. 对象存储 OSS > 开发指南 > 存储类型 > 存储类型介绍
  7. sklearn学习 6.聚类算法K-Means
  8. 【echarts】 tooltip显示图片
  9. win10如何解决浏览器出现“正在解析主机”的问题,很大原因是虚拟机,虚拟网卡,小米随身wifi导致的,DNS优选下载,
  10. UI——PS色彩搭配
  11. ISO14443、15693、18000体系分析
  12. Anaconda如何更新pip
  13. 苹果的Apple Pay学习
  14. winpe读取linux硬盘数据恢复,如何在WinPE环境下完成文件恢复
  15. 大数据可视化期末复习
  16. 宏基Acer4710系列宝石本白屏闪屏等屏幕疑难杂症
  17. 苹果:封闭为王、小众立场和技术嗅觉
  18. 岂无远道思亲泪,不及高堂念子心,堂上二老既活佛,何用灵山朝世尊。
  19. proteus常用元件图示和名称介绍
  20. stm32启用内部晶振(stm32设置外部晶振)

热门文章

  1. 米饭 低 gi 高 gi 指数
  2. XXL-Job动态添加任务
  3. 移植 linux-5.8.5 到 iTOP-4412(一)build config
  4. 面试题-专业名称诠释
  5. android webview全屏显示html内容
  6. python爬取抖音粉丝数据_爬取抖音粉丝数据1(作品、喜欢、ID 、关注) 完整源代码...
  7. MMCM与PLL的区别
  8. 《这个男人来自地球》台词
  9. 计算机视觉研究人员必备的Linux命令行技巧
  10. qt大作业——消灭星星