SortShuffleWriter是spark中一种shuffle的方式,一下是其write()方法。

override def write(records: Iterator[Product2[K, V]]): Unit = {sorter = if (dep.mapSideCombine) {new ExternalSorter[K, V, C](context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)} else {// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't// care whether the keys get sorted in each partition; that will be done on the reduce side// if the operation being run is sortByKey.new ExternalSorter[K, V, V](context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)}sorter.insertAll(records)// Don't bother including the time to open the merged output file in the shuffle write time,// because it just opens a single file, so is typically too fast to measure accurately// (see SPARK-3570).val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)val tmp = Utils.tempFileWith(output)try {val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)val partitionLengths = sorter.writePartitionedFile(blockId, tmp)shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths,writeMetrics.recordsWritten)} finally {if (tmp.exists() && !tmp.delete()) {logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")}}
}

首先,根据进来的数据是否需要聚合来选择不同的ExternalSorter构造方式,之后将所有数据通过ExternalSorter的insertAll()方法进行排序。

def insertAll(records: Iterator[Product2[K, V]]): Unit = {// TODO: stop combining if we find that the reduction factor isn't highval shouldCombine = aggregator.isDefinedif (shouldCombine) {// Combine values in-memory first using our AppendOnlyMapval mergeValue = aggregator.get.mergeValueval createCombiner = aggregator.get.createCombinervar kv: Product2[K, V] = nullval update = (hadValue: Boolean, oldValue: C) => {if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)}while (records.hasNext) {addElementsRead()kv = records.next()map.changeValue((getPartition(kv._1), kv._1), update)maybeSpillCollection(usingMap = true)}} else {// Stick values into our bufferwhile (records.hasNext) {addElementsRead()val kv = records.next()buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])maybeSpillCollection(usingMap = false)}}
}

在insertAll()方法中,会根据之前是否需要聚合选择不同的容器来缓存数据,其中需要聚合选择map否则选择buffer。

Map和buffer的实现如下:

@volatile private var map = new PartitionedAppendOnlyMap[K, C]
@volatile private var buffer = new PartitionedPairBuffer[K, C]

看到这两者的区别,首先是PartitionedPairBuffer的insert()方法。

def insert(partition: Int, key: K, value: V): Unit = {if (curSize == capacity) {growArray()}data(2 * curSize) = (partition, key.asInstanceOf[AnyRef])data(2 * curSize + 1) = value.asInstanceOf[AnyRef]curSize += 1afterUpdate()
}

其底层的存储结构是一个Array,其中每条记录会占用两个存储空间,第一个则是(分区号,key)的元组,而第二个则是具体的value,因为不需要考虑聚合操作,所以新纪录的加入直接加入到了数组的末端,需要扩容则扩容。

下面是PartitionedAppendOnlyMap()的insert()方法。

def insert(partition: Int, key: K, value: V): Unit = {update((partition, key), value)
}
def update(key: K, value: V): Unit = {assert(!destroyed, destructionMessage)val k = key.asInstanceOf[AnyRef]if (k.eq(null)) {if (!haveNullValue) {incrementSize()}nullValue = valuehaveNullValue = truereturn}var pos = rehash(key.hashCode) & maskvar i = 1while (true) {val curKey = data(2 * pos)if (curKey.eq(null)) {data(2 * pos) = kdata(2 * pos + 1) = value.asInstanceOf[AnyRef]incrementSize()  // Since we added a new keyreturn} else if (k.eq(curKey) || k.equals(curKey)) {data(2 * pos + 1) = value.asInstanceOf[AnyRef]return} else {val delta = ipos = (pos + delta) & maski += 1}}
}

虽然相比前者也是Array结构,但在key的分配方式上使用了hash的方式,并且一旦发生碰撞,将会通过线性探测法的方式解决冲突,适合需要聚合操作的场景。

回到insertAll()方法,当完成数据在map或者buffer上的插入后,将会通过maybeSpillCollection()方法判断是否需要创建临时文件。

private def maybeSpillCollection(usingMap: Boolean): Unit = {var estimatedSize = 0Lif (usingMap) {estimatedSize = map.estimateSize()if (maybeSpill(map, estimatedSize)) {map = new PartitionedAppendOnlyMap[K, C]}} else {estimatedSize = buffer.estimateSize()if (maybeSpill(buffer, estimatedSize)) {buffer = new PartitionedPairBuffer[K, C]}}if (estimatedSize > _peakMemoryUsedBytes) {_peakMemoryUsedBytes = estimatedSize}
}
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {var shouldSpill = falseif (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {// Claim up to double our current memory from the shuffle memory poolval amountToRequest = 2 * currentMemory - myMemoryThresholdval granted = acquireMemory(amountToRequest)myMemoryThreshold += granted// If we were granted too little memory to grow further (either tryToAcquire returned 0,// or we already had more memory than myMemoryThreshold), spill the current collectionshouldSpill = currentMemory >= myMemoryThreshold}shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold// Actually spillif (shouldSpill) {_spillCount += 1logSpillage(currentMemory)spill(collection)_elementsRead = 0_memoryBytesSpilled += currentMemoryreleaseMemory()}shouldSpill
}

在maybeSpill()方法中,如果当前扩大内存不足当前的二倍,或者当前内存已经超过规定的需要spill的内存,则会开始spill凑走。具体的spill操作在spill()方法中。

override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)spills += spillFile
}

在此处的spill中会对数据进行相应的排序,并写入到相应的临时文件中。在destructiveSortedWritablePartitionedIterator()方法中。

def destructiveSortedWritablePartitionedIterator(keyComparator: Option[Comparator[K]]): WritablePartitionedIterator = {val it = partitionedDestructiveSortedIterator(keyComparator)new WritablePartitionedIterator {private[this] var cur = if (it.hasNext) it.next() else nulldef writeNext(writer: DiskBlockObjectWriter): Unit = {writer.write(cur._1._2, cur._2)cur = if (it.hasNext) it.next() else null}def hasNext(): Boolean = cur != nulldef nextPartition(): Int = cur._1._1}
}

此处会通过partitionedDestructiveSortedIterator()方法得到已经排序完毕的迭代器。

具体的排序逻辑如下:

def partitionComparator[K]: Comparator[(Int, K)] = new Comparator[(Int, K)] {override def compare(a: (Int, K), b: (Int, K)): Int = {a._1 - b._1}
}/*** A comparator for (Int, K) pairs that orders them both by their partition ID and a key ordering.*/
def partitionKeyComparator[K](keyComparator: Comparator[K]): Comparator[(Int, K)] = {new Comparator[(Int, K)] {override def compare(a: (Int, K), b: (Int, K)): Int = {val partitionDiff = a._1 - b._1if (partitionDiff != 0) {partitionDiff} else {keyComparator.compare(a._2, b._2)}}}
}

实现还是相对简单,先比较分区号,在比较key的大小,来确定排序的顺序。

在得到排好序的迭代器之后一次写入到临时文件中,释放掉当前内存,重新刷新缓存存储数据,完成了spill的目的。

回到SortShuffleSorter的write()方法,当通过insertAll()将数据写入内存和临时文件之后,需要将其merge成一个大的文件。

val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val tmp = Utils.tempFileWith(output)
try {val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)val partitionLengths = sorter.writePartitionedFile(blockId, tmp)shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths,writeMetrics.recordsWritten)
} finally {if (tmp.exists() && !tmp.delete()) {logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")}
}

Merge的核心操作在sorter的writePartitionedFile()方法中。

在writePartitionedFile()方法中,在之前的操作中如果数据量并没有到达spill的地步,那么所有需要merge的数据都存在当前的内存中,就只需要重复类似之前spill的操作把当前内存的数据写到最后的文件中,但是如果之前已经存在spill操作,那么就需要把临时文件的数据和当前内存中的数据一起merge到最后的文件中,代码如下:

for ((id, elements) <- this.partitionedIterator) {if (elements.hasNext) {for (elem <- elements) {writer.write(elem._1, elem._2)}val segment = writer.commitAndGet()lengths(id) = segment.length}
}

此处可以看到,这里会得到一个分区迭代器,根据分区迭代器的顺序依次将各个分区中的数据依次顺序写入到结果文件中。

其核心逻辑在merge()方法中。

private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)]): Iterator[(Int, Iterator[Product2[K, C]])] = {val readers = spills.map(new SpillReader(_))val inMemBuffered = inMemory.buffered(0 until numPartitions).iterator.map { p =>val inMemIterator = new IteratorForPartition(p, inMemBuffered)val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator)if (aggregator.isDefined) {// Perform partial aggregation across partitions(p, mergeWithAggregation(iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))} else if (ordering.isDefined) {// No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey);// sort the elements without trying to merge them(p, mergeSort(iterators, ordering.get))} else {(p, iterators.iterator.flatten)}}
}

在这里,会根据数据的分区数量构造相应数量的分区迭代器。

分区迭代器会依次在spill文件中读取所有当前分区的数据,如果定义了聚合或者排序,将会在这里进行操作,否则直接返回。

这里的排序值得一看。

private def mergeSort(iterators: Seq[Iterator[Product2[K, C]]], comparator: Comparator[K]): Iterator[Product2[K, C]] =
{val bufferedIters = iterators.filter(_.hasNext).map(_.buffered)type Iter = BufferedIterator[Product2[K, C]]val heap = new mutable.PriorityQueue[Iter]()(new Ordering[Iter] {// Use the reverse order because PriorityQueue dequeues the maxoverride def compare(x: Iter, y: Iter): Int = comparator.compare(y.head._1, x.head._1)})heap.enqueue(bufferedIters: _*)  // Will contain only the iterators with hasNext = truenew Iterator[Product2[K, C]] {override def hasNext: Boolean = !heap.isEmptyoverride def next(): Product2[K, C] = {if (!hasNext) {throw new NoSuchElementException}val firstBuf = heap.dequeue()val firstPair = firstBuf.next()if (firstBuf.hasNext) {heap.enqueue(firstBuf)}firstPair}}
}

这里各个spill的同一分区的数据已经能进行排序,所以不断获取各个spill的第一个数据就可完成排序。

最后得到所有分区上已经排序好的迭代器,一次顺序写入到最后的文件中。

回到write()方法通过writeIndexFileAndComiit()方法。

索引文件的实现很简单,讲各个分区在文件中的起始偏移量写入索引文件即可。

Utils.tryWithSafeFinally {// We take in lengths of each block, need to convert it to offsets.var offset = 0Lout.writeLong(offset)for (length <- lengths) {offset += lengthout.writeLong(offset)}
}

spark SortShuffleWriter的实现相关推荐

  1. 进阶大数据架构师学习路线

    ![在这里插入图片描述](https://img-blog.csdnimg.cn/25b820fe1d054f53bab70310694faffe.jpeg#pic_center 文末有惊喜 大数据架 ...

  2. Spark ShuffleManager内存缓冲器SortShuffleWriter设计思路剖析-Spark商业环境实战

    本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客.版权声明:本套Spark源码解读及商业实战归作者(秦凯新)所有,禁止转载,欢迎学习. Sp ...

  3. spark 2.3源码分析之SortShuffleWriter

    SortShuffleWriter 概述 SortShuffleWriter它主要是判断在Map端是否需要本地进行combine操作.如果需要聚合,则使用PartitionedAppendOnlyMa ...

  4. 第34课:彻底解密Spark 2.1.X中Shuffle 中SortShuffleWriter排序源码内幕解密

    第34课:彻底解密Spark 2.1.X中Shuffle 中SortShuffleWriter排序源码内幕解密 本文根据家林大神系列课程编写 http://weibo.com/ilovepains S ...

  5. sparkcore分区_Spark学习:Spark源码和调优简介 Spark Core (二)

    本文基于 Spark 2.4.4 版本的源码,试图分析其 Core 模块的部分实现原理,其中如有错误,请指正.为了简化论述,将部分细节放到了源码中作为注释,因此正文中是主要内容. 第一部分内容见: S ...

  6. Spark详解(十):SparkShuffle机制原理分析

    1. Spark Shuffle简介 在Hadoop的MapReduce框架中Shuffle是连接Map和Reduce的桥梁,Map的输出要用到Reduce中必须经过Shuffle这个环节.由于Shu ...

  7. spark shuffle的写操作之准备工作

    前言 在前三篇文章中,spark 源码分析之十九 -- DAG的生成和Stage的划分 剖析了DAG的构建和Stage的划分,spark 源码分析之二十 -- Stage的提交 剖析了TaskSet任 ...

  8. Spark PersistenceEngine持久化引擎与领导选举代理机制内核原理深入剖析-Spark商业环境实战...

    本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客.版权声明:本套Spark源码解读及商业实战归作者(秦凯新)所有,禁止转载,欢迎学习. Sp ...

  9. Spark源码和调优简介 Spark Core

    作者:calvinrzluo,腾讯 IEG 后台开发工程师 本文基于 Spark 2.4.4 版本的源码,试图分析其 Core 模块的部分实现原理,其中如有错误,请指正.为了简化论述,将部分细节放到了 ...

最新文章

  1. oracle 远程exp导出,EXP远程导出完整Oracle数据库
  2. 关于for和foreach,兼顾效率与安全
  3. 周一 周五 记录_6.12日独家提示买入的 民丰特纸,属于突破年线的股票当天,所以我们买入后小幅盈利 于今天周一6月月15日冲高逢高获利卖出...
  4. 云炬创业政策学习笔记20210111
  5. pd对焦速度_捕捉爆炸瞬间!魅蓝Note6双PD对焦速度逆天
  6. 引领架构创新之路第八届系统架构师大会撼世来袭
  7. Silverlight 中的 CoreCLR
  8. 硅谷NewGen:AI棋至拐点,推动下一代技术变革
  9. sundancest201驱动_驱动支持列表
  10. java FTP连接时出现“227 Entering Passive Mode”的解决方法
  11. 手写杀毒软件——放心的安全卫士
  12. 如何有效训练你的研究能力
  13. java阿姆斯特朗数,Java判断阿姆斯特朗数
  14. 智能体温监测预警系统方案开发
  15. java springMVC demo 事例 注解模式 例子 完整事例
  16. 未来 5 年的 5 大技术趋势
  17. 基于JAVA一日三餐信息系统计算机毕业设计源码+系统+数据库+lw文档+部署
  18. VMware P2V---从物理机到虚拟机(二)
  19. 自己在网上收集的一些Qt的小用法
  20. Scrapy从入门到精通(3)--使用Item封装数据

热门文章

  1. Spring和Mybatis整合
  2. Java任务调度之Quartz快速入门
  3. 微型计算机的普通显示器通常有两组引线 即,微型计算机的显示屏通常具有两组引线,即()...
  4. Java类的加载过程,类加载器,双亲委派原则
  5. Linux下用户、组、权限操作
  6. [树状数组][哈希]JZOJ 3240 Seat
  7. 算法踩坑4-冒泡排序
  8. 个人作业-Week1
  9. 64.判断当前线程是否是主线程
  10. FastReport的动态页面设置