
override def write(records: Iterator[Product2[K, V]]): Unit = {sorter = if (dep.mapSideCombine) {new ExternalSorter[K, V, C](context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)} else {// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't// care whether the keys get sorted in each partition; that will be done on the reduce side// if the operation being run is ExternalSorter[K, V, V](context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)}sorter.insertAll(records)// Don't bother including the time to open the merged output file in the shuffle write time,// because it just opens a single file, so is typically too fast to measure accurately// (see SPARK-3570).val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)val tmp = Utils.tempFileWith(output)try {val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)val partitionLengths = sorter.writePartitionedFile(blockId, tmp)shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths,writeMetrics.recordsWritten)} finally {if (tmp.exists() && !tmp.delete()) {logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")}}


def insertAll(records: Iterator[Product2[K, V]]): Unit = {// TODO: stop combining if we find that the reduction factor isn't highval shouldCombine = aggregator.isDefinedif (shouldCombine) {// Combine values in-memory first using our AppendOnlyMapval mergeValue = aggregator.get.mergeValueval createCombiner = aggregator.get.createCombinervar kv: Product2[K, V] = nullval update = (hadValue: Boolean, oldValue: C) => {if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)}while (records.hasNext) {addElementsRead()kv =, kv._1), update)maybeSpillCollection(usingMap = true)}} else {// Stick values into our bufferwhile (records.hasNext) {addElementsRead()val kv =, kv._1, kv._2.asInstanceOf[C])maybeSpillCollection(usingMap = false)}}



@volatile private var map = new PartitionedAppendOnlyMap[K, C]
@volatile private var buffer = new PartitionedPairBuffer[K, C]


def insert(partition: Int, key: K, value: V): Unit = {if (curSize == capacity) {growArray()}data(2 * curSize) = (partition, key.asInstanceOf[AnyRef])data(2 * curSize + 1) = value.asInstanceOf[AnyRef]curSize += 1afterUpdate()



def insert(partition: Int, key: K, value: V): Unit = {update((partition, key), value)
def update(key: K, value: V): Unit = {assert(!destroyed, destructionMessage)val k = key.asInstanceOf[AnyRef]if (k.eq(null)) {if (!haveNullValue) {incrementSize()}nullValue = valuehaveNullValue = truereturn}var pos = rehash(key.hashCode) & maskvar i = 1while (true) {val curKey = data(2 * pos)if (curKey.eq(null)) {data(2 * pos) = kdata(2 * pos + 1) = value.asInstanceOf[AnyRef]incrementSize()  // Since we added a new keyreturn} else if (k.eq(curKey) || k.equals(curKey)) {data(2 * pos + 1) = value.asInstanceOf[AnyRef]return} else {val delta = ipos = (pos + delta) & maski += 1}}



private def maybeSpillCollection(usingMap: Boolean): Unit = {var estimatedSize = 0Lif (usingMap) {estimatedSize = map.estimateSize()if (maybeSpill(map, estimatedSize)) {map = new PartitionedAppendOnlyMap[K, C]}} else {estimatedSize = buffer.estimateSize()if (maybeSpill(buffer, estimatedSize)) {buffer = new PartitionedPairBuffer[K, C]}}if (estimatedSize > _peakMemoryUsedBytes) {_peakMemoryUsedBytes = estimatedSize}
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {var shouldSpill = falseif (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {// Claim up to double our current memory from the shuffle memory poolval amountToRequest = 2 * currentMemory - myMemoryThresholdval granted = acquireMemory(amountToRequest)myMemoryThreshold += granted// If we were granted too little memory to grow further (either tryToAcquire returned 0,// or we already had more memory than myMemoryThreshold), spill the current collectionshouldSpill = currentMemory >= myMemoryThreshold}shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold// Actually spillif (shouldSpill) {_spillCount += 1logSpillage(currentMemory)spill(collection)_elementsRead = 0_memoryBytesSpilled += currentMemoryreleaseMemory()}shouldSpill


override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)spills += spillFile


def destructiveSortedWritablePartitionedIterator(keyComparator: Option[Comparator[K]]): WritablePartitionedIterator = {val it = partitionedDestructiveSortedIterator(keyComparator)new WritablePartitionedIterator {private[this] var cur = if (it.hasNext) else nulldef writeNext(writer: DiskBlockObjectWriter): Unit = {writer.write(cur._1._2, cur._2)cur = if (it.hasNext) else null}def hasNext(): Boolean = cur != nulldef nextPartition(): Int = cur._1._1}



def partitionComparator[K]: Comparator[(Int, K)] = new Comparator[(Int, K)] {override def compare(a: (Int, K), b: (Int, K)): Int = {a._1 - b._1}
}/*** A comparator for (Int, K) pairs that orders them both by their partition ID and a key ordering.*/
def partitionKeyComparator[K](keyComparator: Comparator[K]): Comparator[(Int, K)] = {new Comparator[(Int, K)] {override def compare(a: (Int, K), b: (Int, K)): Int = {val partitionDiff = a._1 - b._1if (partitionDiff != 0) {partitionDiff} else {, b._2)}}}




val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val tmp = Utils.tempFileWith(output)
try {val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)val partitionLengths = sorter.writePartitionedFile(blockId, tmp)shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths,writeMetrics.recordsWritten)
} finally {if (tmp.exists() && !tmp.delete()) {logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")}



for ((id, elements) <- this.partitionedIterator) {if (elements.hasNext) {for (elem <- elements) {writer.write(elem._1, elem._2)}val segment = writer.commitAndGet()lengths(id) = segment.length}



private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)]): Iterator[(Int, Iterator[Product2[K, C]])] = {val readers = SpillReader(_))val inMemBuffered = inMemory.buffered(0 until numPartitions) { p =>val inMemIterator = new IteratorForPartition(p, inMemBuffered)val iterators = ++ Seq(inMemIterator)if (aggregator.isDefined) {// Perform partial aggregation across partitions(p, mergeWithAggregation(iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))} else if (ordering.isDefined) {// No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey);// sort the elements without trying to merge them(p, mergeSort(iterators, ordering.get))} else {(p, iterators.iterator.flatten)}}




private def mergeSort(iterators: Seq[Iterator[Product2[K, C]]], comparator: Comparator[K]): Iterator[Product2[K, C]] =
{val bufferedIters = iterators.filter(_.hasNext).map(_.buffered)type Iter = BufferedIterator[Product2[K, C]]val heap = new mutable.PriorityQueue[Iter]()(new Ordering[Iter] {// Use the reverse order because PriorityQueue dequeues the maxoverride def compare(x: Iter, y: Iter): Int =, x.head._1)})heap.enqueue(bufferedIters: _*)  // Will contain only the iterators with hasNext = truenew Iterator[Product2[K, C]] {override def hasNext: Boolean = !heap.isEmptyoverride def next(): Product2[K, C] = {if (!hasNext) {throw new NoSuchElementException}val firstBuf = heap.dequeue()val firstPair = (firstBuf.hasNext) {heap.enqueue(firstBuf)}firstPair}}





Utils.tryWithSafeFinally {// We take in lengths of each block, need to convert it to offsets.var offset = 0Lout.writeLong(offset)for (length <- lengths) {offset += lengthout.writeLong(offset)}

spark SortShuffleWriter的实现相关推荐

  1. 进阶大数据架构师学习路线

    ![在这里插入图片描述]( 文末有惊喜 大数据架 ...

  2. Spark ShuffleManager内存缓冲器SortShuffleWriter设计思路剖析-Spark商业环境实战

    本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客.版权声明:本套Spark源码解读及商业实战归作者(秦凯新)所有,禁止转载,欢迎学习. Sp ...

  3. spark 2.3源码分析之SortShuffleWriter

    SortShuffleWriter 概述 SortShuffleWriter它主要是判断在Map端是否需要本地进行combine操作.如果需要聚合,则使用PartitionedAppendOnlyMa ...

  4. 第34课:彻底解密Spark 2.1.X中Shuffle 中SortShuffleWriter排序源码内幕解密

    第34课:彻底解密Spark 2.1.X中Shuffle 中SortShuffleWriter排序源码内幕解密 本文根据家林大神系列课程编写 S ...

  5. sparkcore分区_Spark学习:Spark源码和调优简介 Spark Core (二)

    本文基于 Spark 2.4.4 版本的源码,试图分析其 Core 模块的部分实现原理,其中如有错误,请指正.为了简化论述,将部分细节放到了源码中作为注释,因此正文中是主要内容. 第一部分内容见: S ...

  6. Spark详解(十):SparkShuffle机制原理分析

    1. Spark Shuffle简介 在Hadoop的MapReduce框架中Shuffle是连接Map和Reduce的桥梁,Map的输出要用到Reduce中必须经过Shuffle这个环节.由于Shu ...

  7. spark shuffle的写操作之准备工作

    前言 在前三篇文章中,spark 源码分析之十九 -- DAG的生成和Stage的划分 剖析了DAG的构建和Stage的划分,spark 源码分析之二十 -- Stage的提交 剖析了TaskSet任 ...

  8. Spark PersistenceEngine持久化引擎与领导选举代理机制内核原理深入剖析-Spark商业环境实战...

    本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客.版权声明:本套Spark源码解读及商业实战归作者(秦凯新)所有,禁止转载,欢迎学习. Sp ...

  9. Spark源码和调优简介 Spark Core

    作者:calvinrzluo,腾讯 IEG 后台开发工程师 本文基于 Spark 2.4.4 版本的源码,试图分析其 Core 模块的部分实现原理,其中如有错误,请指正.为了简化论述,将部分细节放到了 ...


  1. oracle 远程exp导出,EXP远程导出完整Oracle数据库
  2. 关于for和foreach,兼顾效率与安全
  3. 周一 周五 记录_6.12日独家提示买入的 民丰特纸,属于突破年线的股票当天,所以我们买入后小幅盈利 于今天周一6月月15日冲高逢高获利卖出...
  4. 云炬创业政策学习笔记20210111
  5. pd对焦速度_捕捉爆炸瞬间!魅蓝Note6双PD对焦速度逆天
  6. 引领架构创新之路第八届系统架构师大会撼世来袭
  7. Silverlight 中的 CoreCLR
  8. 硅谷NewGen:AI棋至拐点,推动下一代技术变革
  9. sundancest201驱动_驱动支持列表
  10. java FTP连接时出现“227 Entering Passive Mode”的解决方法
  11. 手写杀毒软件——放心的安全卫士
  12. 如何有效训练你的研究能力
  13. java阿姆斯特朗数,Java判断阿姆斯特朗数
  14. 智能体温监测预警系统方案开发
  15. java springMVC demo 事例 注解模式 例子 完整事例
  16. 未来 5 年的 5 大技术趋势
  17. 基于JAVA一日三餐信息系统计算机毕业设计源码+系统+数据库+lw文档+部署
  18. VMware P2V---从物理机到虚拟机(二)
  19. 自己在网上收集的一些Qt的小用法
  20. Scrapy从入门到精通(3)--使用Item封装数据


  1. Spring和Mybatis整合
  2. Java任务调度之Quartz快速入门
  3. 微型计算机的普通显示器通常有两组引线 即,微型计算机的显示屏通常具有两组引线,即()...
  4. Java类的加载过程,类加载器,双亲委派原则
  5. Linux下用户、组、权限操作
  6. [树状数组][哈希]JZOJ 3240 Seat
  7. 算法踩坑4-冒泡排序
  8. 个人作业-Week1
  9. 64.判断当前线程是否是主线程
  10. FastReport的动态页面设置