Tungsten简介

tungsten-sort这个名字作为一种排序方法,听起来有点怪异。下面简单介绍一下Tungsten。

Project Tungsten(“钨丝计划”)是DataBricks在3~4年前提出的Spark优化方案。由于Spark主要用Scala来开发,底层依赖JVM,因此不可避免地带来一些额外的性能开销(所谓overhead)。Tungsten致力于优化Spark的CPU与内存效率,主要有三方面:

  • 显式内存管理与基于二进制的处理:由Spark应用自己管理(序列化的)对象和内存,消除JVM对象模型和GC等带来的overhead;
  • 对缓存有感知的计算:提出高效的、能充分利用计算机存储体系的算法和数据结构;
  • 代码生成技术:充分利用最新的编译器和CPU的特性,提高运行效率。

关于它的详情,可以参看DataBricks的官方说明,以及Tungsten对应的Jira issue。

目前,Tungsten在Spark SQL方面的应用最广泛。在其他方面,tungsten-sort shuffle就是比较重要的应用。从上面我们可以感觉到,tungsten-sort shuffle与前面的两种方式区别非常大,也会更难理解。下面来具体探索它的shuffle write细节,注释会写得尽量详细一点。

shuffle write入口

#1 - o.a.s.shuffle.sort.UnsafeShuffleWriter.write()方法

类名之所以叫UnsafeShuffleWriter,是因为Tungsten内部使用了很多sun.misc.Unsafe的API。

  @Overridepublic void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {// Keep track of success so we know if we encountered an exception// We do this rather than a standard try/catch/re-throw to handle// generic throwables.boolean success = false;try {while (records.hasNext()) {//【#2 - 将shuffle数据插入排序器ShuffleExternalSorter进行处理】insertRecordIntoSorter(records.next());}//【#9 - 合并与写输出文件】closeAndWriteOutput();success = true;} finally {if (sorter != null) {try {sorter.cleanupResources();} catch (Exception e) {// Only throw this error if we won't be masking another// error.if (success) {throw e;} else {logger.error("In addition to a failure during writing, we failed during " +"cleanup.", e);}}}}}

看起来好像和SortShuffleWriter是一样的套路,甚至还更简单?那是不可能的。

序列化

#2 - o.a.s.shuffle.sort.UnsafeShuffleWriter.open()与insertRecordIntoSorter()方法

  //【这个方法用来初始化insertRecordIntoSorter()用到的东西】private void open() {assert (sorter == null);//【sorter是ShuffleExternalSorter类的实例】sorter = new ShuffleExternalSorter(memoryManager,blockManager,taskContext,initialSortBufferSize,partitioner.numPartitions(),sparkConf,writeMetrics);//【MyByteArrayOutputStream类是ByteArrayOutputStream的简单封装,只是将内部byte[]数组暴露出来】//【DEFAULT_INITIAL_SER_BUFFER_SIZE常量值是1024 * 1024,即缓冲区初始1MB大】serBuffer = new MyByteArrayOutputStream(DEFAULT_INITIAL_SER_BUFFER_SIZE);//【获取序列化流SerializationStream】serOutputStream = serializer.serializeStream(serBuffer);}@VisibleForTestingvoid insertRecordIntoSorter(Product2<K, V> record) throws IOException {assert(sorter != null);//【获取到键和分区】final K key = record._1();final int partitionId = partitioner.getPartition(key);serBuffer.reset();//【序列化键和值,将它们都当做Object写入缓冲区】serOutputStream.writeKey(key, OBJECT_CLASS_TAG);serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);serOutputStream.flush();//【取得序列化数据的大小】final int serializedRecordSize = serBuffer.size();assert (serializedRecordSize > 0);//【#3 - 将序列化之后的二进制数据插入ShuffleExternalSorter处理】//【BYTE_ARRAY_OFFSET对应Unsafe中的native方法arrayBaseOffset()】sorter.insertRecord(serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);}

这样一来,shuffle数据就已经序列化了,在shuffle write阶段结束前,都将在这些序列化的数据上操作,因此使用tungsten-sort的其中一条限制就是完全没有聚合操作。

下面会涉及到一些类似C语言风格的、偏底层的逻辑。这倒是符合Tungsten的“显式内存管理与基于二进制的处理”特征。

那么,向ShuffleExternalSorter插入数据之后又发生了什么呢?

内存缓存与磁盘溢写

#3 - o.a.s.shuffle.sort.ShuffleExternalSorter.insertRecord()方法

  /*** Write a record to the shuffle sorter.*/public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId)throws IOException {// for tests//【inMemSorter在之前已经初始化过了。顾名思义,它是个内存排序器】//【this.inMemSorter = new ShuffleInMemorySorter(this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", true));】assert(inMemSorter != null);//【如果已经有超过spark.shuffle.spill.numElementsForceSpillThreshold条数据】if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {logger.info("Spilling data because number of spilledRecords crossed the threshold " +numElementsForSpillThreshold);//【#5 - 直接溢写磁盘】spill();}//【#4 - 检查是否需要对排序缓存扩容及溢写】//【注意这个“pointerArray”,指针数组,是inMemSorter里的东西。看到本方法最后一句就能明白一点】growPointerArrayIfNecessary();// Need 4 bytes to store the record length.//【数据长度是int型,占额外4个字节】final int required = length + 4;//【如果需要更多内存,向TaskMemoryManager申请新的页】//【注意"页"(page)这个概念。Tungsten的内存管理与操作系统中的分页内存管理非常像】acquireNewPageIfNecessary(required);assert(currentPage != null);//【currentPage是一个MemoryBlock实例,它又是MemoryLocation的子类】//【由于引用传递,这里的base和下面的recordBase都代表一个Object在堆内的基地址】final Object base = currentPage.getBaseObject();//【#6 - 将当前这条数据的逻辑内存地址(页号+偏移量)编码成一个长整型】final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);//【先写长度值,并移动指针。Platform类中几乎所有方法都是直接调用unsafe API】Platform.putInt(base, pageCursor, length);pageCursor += 4;//【再写序列化之后的数据】Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);pageCursor += length;//【#7 - 将分区ID和编码的逻辑地址(“指针”)送给ShuffleInMemorySorter进行排序】inMemSorter.insertRecord(recordAddress, partitionId);}

这段代码信息量大而晦涩,不过我们可以清楚地知道,在tungsten-sort机制中,也存在数据缓存和溢写,这与sort shuffle是类似的。但是,这里不再借助像PartitionedPairBuffer之类的高级数据结构,而是由程序自己完成,并且是直接操作内存空间。另外,真正完成排序工作的是最后出场的ShuffleInMemorySorter。

代码注释中提到的“分页内存管理”是大学操作系统课程中会讲到的东西,下面只用一幅图来简单展示下。

分页内存管理逻辑地址到物理地址的转换

如果不了解或者忘记了逻辑地址、物理地址、页表、快表(TLB)等概念的话,可以参看操作系统方面的书籍,如Abraham Silberschatz所著《操作系统概念》(Operating System Concepts)。这也是我上大学时用的教材,非常好。

往回拢一拢,看看tungsten-sort机制中是如何溢写的。

#4 - o.a.s.shuffle.sort.ShuffleExternalSorter.growPointerArrayIfNecessary()方法

  /*** Checks whether there is enough space to insert an additional record in to the sort pointer* array and grows the array if additional space is required. If the required space cannot be* obtained, then the in-memory data will be spilled to disk.*/private void growPointerArrayIfNecessary() throws IOException {assert(inMemSorter != null);//【pointerArray缓存中写不下新数据了】if (!inMemSorter.hasSpaceForAnotherRecord()) {//【取得当前内存占用量】long used = inMemSorter.getMemoryUsage();LongArray array;try {// could trigger spilling//【试图给缓存分配原来两倍大的容量。除以8是因为long占64bit】array = allocateArray(used / 8 * 2);} catch (TooLargePageException e) {// The pointer array is too big to fix in a single page, spill.//【#5 - 如果分配内存超出了一页的限制,就直接溢写】spill();return;} catch (SparkOutOfMemoryError e) {// should have trigger spilling//【如果OOM了,就是扩容与溢写都没成功,抛异常出去】if (!inMemSorter.hasSpaceForAnotherRecord()) {logger.error("Unable to grow the pointer array");throw e;}return;}// check if spilling is triggered or notif (inMemSorter.hasSpaceForAnotherRecord()) {//【如果上面一波操作过后又有了剩余空间,表示已经溢写了,没必要扩容。释放掉刚才分配的内存】freeArray(array);} else {//【否则就是没溢写,真正去扩容】//【这个方法非常简单,就是直接把原来的内容复制到新数组,然后free掉原来的空间】inMemSorter.expandPointerArray(array);}}}

与sort shuffle机制类似,在溢写之前仍然会先申请内存扩容,不过这里会受到页大小的限制。那么一页最大是多少呢?在PackedRecordPointer类中定义有常量:

static final int MAXIMUM_PAGE_SIZE_BYTES = 1 << 27;

static final int MAXIMUM_PARTITION_ID = (1 << 24) - 1;

可见,一页最大是2^27字节,即128MB。之前提到的最大分区数目也是在这里定义的。另外由于页大小的限制,序列化的数据单条也不能超过128MB,这是前述三个限制条件之外的第四个条件。

上面代码中的内存操作与C语言中的malloc()/free()非常相似,底层都是依赖TaskMemoryManager类来管理的。之后在讨论Tungsten内存管理机制时,会着重分析它的源码。

下面来看具体的溢写方法。

#5 - o.a.s.shuffle.sort.ShuffleExternalSorter.spill()与writeSortedFile()方法

  /*** Sort and spill the current records in response to memory pressure.*/@Overridepublic long spill(long size, MemoryConsumer trigger) throws IOException {if (trigger != this || inMemSorter == null || inMemSorter.numRecords() == 0) {return 0L;}logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",Thread.currentThread().getId(),Utils.bytesToString(getMemoryUsage()),spills.size(),spills.size() > 1 ? " times" : " time");//【将数据按序写入文件】//【boolean参数表示是否为最终的输出文件。这里为false,表示是溢写文件】writeSortedFile(false);final long spillSize = freeMemory();inMemSorter.reset();// Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the// records. Otherwise, if the task is over allocated memory, then without freeing the memory// pages, we might not be able to get memory for the pointer array.taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);return spillSize;}/*** Sorts the in-memory records and writes the sorted records to an on-disk file.* This method does not free the sort data structures.** @param isLastFile if true, this indicates that we're writing the final output file and that the*                   bytes written should be counted towards shuffle spill metrics rather than*                   shuffle write metrics.*/private void writeSortedFile(boolean isLastFile) {final ShuffleWriteMetrics writeMetricsToUse;//【判断是最终输出文件还是溢写文件,然后更新不同的指标】if (isLastFile) {// We're writing the final non-spill file, so we _do_ want to count this as shuffle bytes.writeMetricsToUse = writeMetrics;} else {// We're spilling, so bytes written should be counted towards spill rather than write.// Create a dummy WriteMetrics object to absorb these metrics, since we don't want to count// them towards shuffle bytes written.writeMetricsToUse = new ShuffleWriteMetrics();}// This call performs the actual sort.//【#6 - 用inMemSorter排序,返回排序结果的"迭代器"(不是Java内部的迭代器,是单独实现的,像指针)】final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =inMemSorter.getSortedIterator();// Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to// be an API to directly transfer bytes from managed memory to the disk writer, we buffer// data through a byte array. This array does not need to be large enough to hold a single// record;//【创建一个写入缓存作为内存与DiskBlockObjectWriter之间的中转】//【diskWriteBufferSize大小是SHUFFLE_DISK_WRITE_BUFFER_SIZE常量,即1MB】final byte[] writeBuffer = new byte[diskWriteBufferSize];// Because this output will be read during shuffle, its compression codec must be controlled by// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use// createTempShuffleBlock here; see SPARK-3426 for more details.//【创建临时的shuffle块】final Tuple2<TempShuffleBlockId, File> spilledFileInfo =blockManager.diskBlockManager().createTempShuffleBlock();final File file = spilledFileInfo._2();final TempShuffleBlockId blockId = spilledFileInfo._1();final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId);// Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter.// Our write path doesn't actually use this serializer (since we end up calling the `write()`// OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work// around this, we pass a dummy no-op serializer.//【构造DiskBlockObjectWriter时必须要一个序列化器,所以这里新建一个dummy(不做任何转化的)序列化器】final SerializerInstance ser = DummySerializerInstance.INSTANCE;final DiskBlockObjectWriter writer =blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse);int currentPartition = -1;while (sortedRecords.hasNext()) {//【按分区遍历已经排好序的数据】sortedRecords.loadNext();final int partition = sortedRecords.packedRecordPointer.getPartitionId();assert (partition >= currentPartition);if (partition != currentPartition) {// Switch to the new partition//【检查分区号如果发生了变化,就先提交当前分区】if (currentPartition != -1) {//【提交写操作,得到FileSegment,并记录分区的大小】final FileSegment fileSegment = writer.commitAndGet();spillInfo.partitionLengths[currentPartition] = fileSegment.length();}//【然后换到下一个分区继续】currentPartition = partition;}//【取得指针,再通过指针取得页号与偏移量,就得到了数据的起始地址和长度】final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();final Object recordPage = taskMemoryManager.getPage(recordPointer);final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer);//【先取得数据前面存储的长度,然后让指针跳过它】int dataRemaining = Platform.getInt(recordPage, recordOffsetInPage);long recordReadPosition = recordOffsetInPage + 4; // skip over record lengthwhile (dataRemaining > 0) {//【将数据拷贝到上面创建的缓存中,通过缓存转到DiskBlockObjectWriter】final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining);Platform.copyMemory(recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer);//【写数据】writer.write(writeBuffer, 0, toTransfer);//【移动指针,更新余量】recordReadPosition += toTransfer;dataRemaining -= toTransfer;}writer.recordWritten();}//【提交最后一个分区】final FileSegment committedSegment = writer.commitAndGet();writer.close();// If `writeSortedFile()` was called from `closeAndGetSpills()` and no records were inserted,// then the file might be empty. Note that it might be better to avoid calling// writeSortedFile() in that case.if (currentPartition != -1) {//【记录溢写文件的列表】spillInfo.partitionLengths[currentPartition] = committedSegment.length();spills.add(spillInfo);}if (!isLastFile) {  // i.e. this is a spill file//【原注释太长太长了,略去】//【如果是溢写文件,更新溢写的记录指标】writeMetrics.incRecordsWritten(writeMetricsToUse.recordsWritten());taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.bytesWritten());}}

可见,在溢写之前仍然先对数据排序,并且一定会按分区号有序。每趟溢写产生的文件数与分区数相同,这点与基本sort shuffle不一样,sort shuffle的写批次是有参数控制的。

至此,溢写逻辑就完成了。那么,数据是以什么形式进入ShuffleInMemorySorter的,具体排序逻辑又是怎么样的?下面来看。

数据插入和排序

先来看数据是如何定址的。

#6 - o.a.s.memory.TaskMemoryManager.encodePageNumberAndOffset()方法

  /** The number of bits used to address the page table. */private static final int PAGE_NUMBER_BITS = 13;/** The number of bits used to encode offsets in data pages. */@VisibleForTestingstatic final int OFFSET_BITS = 64 - PAGE_NUMBER_BITS;  // 51/*** Given a memory page and offset within that page, encode this address into a 64-bit long.* This address will remain valid as long as the corresponding page has not been freed.** @param page a data page allocated by {@link TaskMemoryManager#allocatePage}/* @param offsetInPage an offset in this page which incorporates the base offset. In other words,*                     this should be the value that you would pass as the base offset into an*                     UNSAFE call (e.g. page.baseOffset() + something).* @return an encoded page address.*/public long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) {//【如果开启了堆外内存,由于在堆外不存在对象的基地址,所以偏移量也是绝对地址,有可能会非常大】//【故需要减去当前页的基地址,变成相对的】if (tungstenMemoryMode == MemoryMode.OFF_HEAP) {// In off-heap mode, an offset is an absolute address that may require a full 64 bits to// encode. Due to our page size limitation, though, we can convert this into an offset that's// relative to the page's base offset; this relative offset will fit in 51 bits.offsetInPage -= page.getBaseOffset();}//【当然我们现在主要考虑堆内内存的情况】return encodePageNumberAndOffset(page.pageNumber, offsetInPage);}@VisibleForTestingpublic static long encodePageNumberAndOffset(int pageNumber, long offsetInPage) {assert (pageNumber >= 0) : "encodePageNumberAndOffset called with invalid page";//【页号左移51位,然后拼上偏移量与上一个低51bit都为1的掩码(0x7FFFFFFFFFFFFL)】return (((long) pageNumber) << OFFSET_BITS) | (offsetInPage & MASK_LONG_LOWER_51_BITS);}

从这段代码中,我们可以看出Tungsten中逻辑地址的表示方式:高13bit为页号,低51bit为偏移量。可是,前面已经说过一页的最大容量是128MB,也就是说偏移量只占用了27bit,剩下的24bit哪里去了?答案是给了分区号来使用,下面马上会看到。

#7 - o.a.s.shuffle.sort.ShuffleInMemorySorter.insertRecord()与PackedRecordPointer.packPointer()方法

  /*** Inserts a record to be sorted.** @param recordPointer a pointer to the record, encoded by the task memory manager. Due to*                      certain pointer compression techniques used by the sorter, the sort can*                      only operate on pointers that point to locations in the first*                      {@link PackedRecordPointer#MAXIMUM_PAGE_SIZE_BYTES} bytes of a data page.* @param partitionId the partition id, which must be less than or equal to*                    {@link PackedRecordPointer#MAXIMUM_PARTITION_ID}.*/public void insertRecord(long recordPointer, int partitionId) {if (!hasSpaceForAnotherRecord()) {throw new IllegalStateException("There is no space for new record");}//【这里的array就是前文提到过的pointerArray,是LongArray类型的,里面存储的是"打包"过的指针】array.set(pos, PackedRecordPointer.packPointer(recordPointer, partitionId));pos++;}/*** Pack a record address and partition id into a single word.** @param recordPointer a record pointer encoded by TaskMemoryManager.* @param partitionId a shuffle partition id (maximum value of 2^24).* @return a packed pointer that can be decoded using the {@link PackedRecordPointer} class.*/public static long packPointer(long recordPointer, int partitionId) {assert (partitionId <= MAXIMUM_PARTITION_ID);// Note that without word alignment we can address 2^27 bytes = 128 megabytes per page.// Also note that this relies on some internals of how TaskMemoryManager encodes its addresses.//【所谓“打包”其实是压缩。将页号右移了24位,然后与低27位拼在一起,逻辑地址就被压缩到了40位】final long pageNumber = (recordPointer & MASK_LONG_UPPER_13_BITS) >>> 24;final long compressedAddress = pageNumber | (recordPointer & MASK_LONG_LOWER_27_BITS);//【将分区号放在空出来的高位上】return (((long) partitionId) << 40) | compressedAddress;}

由此可见,进入ShuffleInMemorySorter中待排序的指针都符合下图的格式。

tungsten-sort指针位图

既然排序是对指针的排序,那么数据本身自然是不会保证有序性的。看一下排序逻辑的入口。

#8 - o.a.s.shuffle.sort.ShuffleInMemorySorter.getSortedIterator()方法

  /*** Return an iterator over record pointers in sorted order.*/public ShuffleSorterIterator getSortedIterator() {int offset = 0;//【useRadixSort由spark.shuffle.sort.useRadixSort参数控制。默认true】//【如果为true,采用最低位(LSD)优先的基数排序算法。由于分区号在最高位,因此分区是有序的】if (useRadixSort) {offset = RadixSort.sort(array, pos,PackedRecordPointer.PARTITION_ID_START_BYTE_INDEX,PackedRecordPointer.PARTITION_ID_END_BYTE_INDEX, false, false);} else {//【false就采用与sort shuffle相同的TimSort算法,在Sorter类中】MemoryBlock unused = new MemoryBlock(array.getBaseObject(),array.getBaseOffset() + pos * 8L,(array.size() - pos) * 8L);LongArray buffer = new LongArray(unused);Sorter<PackedRecordPointer, LongArray> sorter =new Sorter<>(new ShuffleSortDataFormat(buffer));//【SORT_COMPARATOR是简单的对分区ID的比较器】sorter.sort(array, 0, pos, SORT_COMPARATOR);}return new ShuffleSorterIterator(pos, array, offset);}

可见有两种排序算法,并且都保证分区号有序。默认的基数排序效率非常高,可以自行去参考它的源码,这里就不详细讲解了。

最后,还剩下文件合并与输出部分。加油。

文件合并与输出

#9 - o.a.s.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput()方法

  @VisibleForTestingvoid closeAndWriteOutput() throws IOException {assert(sorter != null);//【更新峰值内存用量】updatePeakMemoryUsed();//【取得所有溢写文件,然后释放掉缓存、输出流和排序器】serBuffer = null;serOutputStream = null;final SpillInfo[] spills = sorter.closeAndGetSpills();sorter = null;final long[] partitionLengths;//【创建输出数据文件和临时数据文件】final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);final File tmp = Utils.tempFileWith(output);try {try {//【#10 - 合并溢写文件到临时输出文件】partitionLengths = mergeSpills(spills, tmp);} finally {//【然后删除它们】for (SpillInfo spill : spills) {if (spill.file.exists() && ! spill.file.delete()) {logger.error("Error while deleting spill file {}", spill.file.getPath());}}}//【创建索引文件。这个方法在之前有详细的解释过】shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);} finally {if (tmp.exists() && !tmp.delete()) {logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());}}//【填充MapStatus结果】mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);}

#10 - o.a.s.shuffle.sort.UnsafeShuffleWriter.mergeSpills()方法

  /*** Merge zero or more spill files together, choosing the fastest merging strategy based on the* number of spills and the IO compression codec.** @return the partition lengths in the merged file.*/private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOException {//【该参数用来控制是否做shuffle阶段的压缩】final boolean compressionEnabled = sparkConf.getBoolean("spark.shuffle.compress", true);final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf);//【该参数用来控制是否要做快速的合并】final boolean fastMergeEnabled =sparkConf.getBoolean("spark.shuffle.unsafe.fastMergeEnabled", true);//【但上面的还不够,还得判断是否支持快速合并。如果shuffle阶段不启用压缩,或者启用了lz4/lzf/snappy/zstd压缩,就支持】final boolean fastMergeIsSupported = !compressionEnabled ||CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams(compressionCodec);final boolean encryptionEnabled = blockManager.serializerManager().encryptionEnabled();try {if (spills.length == 0) {//【如果根本没有溢写文件,写一个空文件】new FileOutputStream(outputFile).close(); // Create an empty filereturn new long[partitioner.numPartitions()];} else if (spills.length == 1) {//【如果只有一个溢写文件,就直接将它合并进输出文件中】// Here, we don't need to perform any metrics updates because the bytes written to this// output file would have already been counted as shuffle bytes written.Files.move(spills[0].file, outputFile);return spills[0].partitionLengths;} else {//【如果有多个溢写文件】final long[] partitionLengths;//【英文注释过长,是关于metrics的,略去】if (fastMergeEnabled && fastMergeIsSupported) {// Compression is disabled or we are using an IO compression codec that supports// decompression of concatenated compressed streams, so we can perform a fast spill merge// that doesn't need to interpret the spilled bytes.//【如果启用并支持快速合并,并且启用了上一篇文章中的transferTo机制,还没有加密】//【就使用NIO zero-copy来合并到输出文件】if (transferToEnabled && !encryptionEnabled) {logger.debug("Using transferTo-based fast merge");partitionLengths = mergeSpillsWithTransferTo(spills, outputFile);} else {//【如果不用transferTo的话,就使用不经压缩的BIO FileStream来合并到输出文件】logger.debug("Using fileStream-based fast merge");partitionLengths = mergeSpillsWithFileStream(spills, outputFile, null);}} else {//【如果不启用或不支持快速合并,就使用压缩的BIO FileStream来合并到输出文件。这个方式比较"慢"】logger.debug("Using slow merge");partitionLengths = mergeSpillsWithFileStream(spills, outputFile, compressionCodec);}// When closing an UnsafeShuffleExternalSorter that has already spilled once but also has// in-memory records, we write out the in-memory records to a file but do not count that// final write as bytes spilled (instead, it's accounted as shuffle write). The merge needs// to be counted as shuffle write, but this will lead to double-counting of the final// SpillInfo's bytes.writeMetrics.decBytesWritten(spills[spills.length - 1].file.length());writeMetrics.incBytesWritten(outputFile.length());//【返回分区大小,索引文件要用到】return partitionLengths;}} catch (IOException e) {if (outputFile.exists() && !outputFile.delete()) {logger.error("Unable to delete output file {}", outputFile.getPath());}throw e;}}

可见,tungsten-sort shuffle的文件合并方式与bypass sort shuffle的方式比较类似,有NIO和BIO两种方式,但判断条件更严格一些。至此,整个tungsten-sort shuffle write流程就结束了。

总结

下图来自https://0x0fff.com/spark-architecture-shuffle/。

tungsten-sort shuffle write流程简图

暂未涉及细节的知识点

  • Tungsten的内存管理机制详情
  • NIO在shuffle处理中的应用

Spark Tungsten-sort shuffle write流程解析相关推荐

  1. Spark基本sort shuffle write流程解析

    shuffle write入口 先回忆一下基础知识: Spark作业执行的单元从高到低为job→stage→task stage分为ShuffleMapStage与ResultStage,task也分 ...

  2. Spark SQL之queryExecution运行流程解析Logical Plan(三)

    1.整体运行流程 使用下列代码对SparkSQL流程进行分析,让大家明白LogicalPlan的几种状态,理解SparkSQL整体执行流程 // sc is an existing SparkCont ...

  3. spark源码-shuffle原理分析-1-ShuffleWriter

    ShuffleWriter 1.概述 2.ShuffleHandle注册 2.1.注册时间点 2.2.向shuffleManager注册shuffle 2.2.1.BypassMergeSortShu ...

  4. Spark源码分析之Sort-Based Shuffle读写流程

    一 概述 我们知道Spark Shuffle机制总共有三种: # 未优化的Hash Shuffle:每一个ShuffleMapTask都会为每一个ReducerTask创建一个单独的文件,总的文件数是 ...

  5. Spark SQL架构工作原理及流程解析

    Spark SQL架构工作原理及流程解析,spark sql从shark发展而来,Shark为了实现Hive兼容,在HQL方面重用了Hive中HQL的解析.逻辑执行计划翻译.执行计划优化等逻辑. Sp ...

  6. 第36课:kaishi 彻底解密Spark 2.1.X中Sort Shuffle中Reducer端源码内幕

    第36课:kaishi 彻底解密Spark 2.1.X中Sort Shuffle中Reducer端源码内幕 本文根据家林大神系列课程编写 http://weibo.com/ilovepains 本课讲 ...

  7. Spark架构与作业执行流程简介

    2019独角兽企业重金招聘Python工程师标准>>> Spark架构与作业执行流程简介 博客分类: spark Local模式 运行Spark最简单的方法是通过Local模式(即伪 ...

  8. SpringMVC 执行流程解析

    SpringMVC 执行流程解析 注:SpringMVC 版本 5.2.15 上面这张图许多人都看过,本文试图从源码的角度带大家分析一下该过程. 1. ContextLoaderListener 首先 ...

  9. Spring Cloud Gateway (七)处理流程解析

    Spring Cloud Gateway (七)处理流程解析 简介     初步梳理 Spring Cloud Gateway 的处理流程 过程记录 主要请求流程     在前面的分析中,我们知道在 ...

最新文章

  1. 计算机组成与体系结构-----数制
  2. 三种权重的初始化方法
  3. HarmonyOS快速开发入门
  4. (收藏)Turbo C 2.0、Borland C++库函数及用例
  5. C#相等性 - 三个方法和一个接口
  6. 给 QtCtreator 工程文件 pro 配置 pthread库和liburcu库
  7. css与jquery、图标字体、常用数据
  8. 【leetcode】栈(python)
  9. 从 VDN 到 QMIX的学习笔记
  10. 日照分析的计算机精度,日照分析计算精度(时间间隔)的粗探
  11. JAVA之假克隆、浅克隆、深克隆
  12. 数模【Mathematica(安装、入门方法、基本计算、基本图形、创建互动模型、利用数据、幻灯片演示、完整实例)】
  13. HTML 笔记/案例
  14. 小程序支付后台实现(服务商)
  15. 理解 Linux 网络栈:Linux 网络协议栈简单总结
  16. 2021秋招河南联通面经
  17. stc8a控制MG90S舵机
  18. 安卓开发者中心!那些BAT大厂的Android面试官到底在想些什么?持续更新中
  19. 判断对象中属性值是否全为空
  20. 连接型CRM助力医疗企业把“成本中心”变成“利润中心”

热门文章

  1. 在html如何将链接隐藏,网页查看隐藏链接的方法和检测工具(附:网站链接隐藏的方法)...
  2. CANoe自动化测试的配置方式总结分析(三)——SystemVariables数组方式
  3. 交换机的电口和光口到底是个啥东东,做网络的这个常识得懂
  4. 学习Go语言必备案例 (3)
  5. iOS逆向 越狱和安卓手机Root的区别
  6. 云枢子表里放入富文本,富文本里放a标签,点击无法触发事件问题
  7. 给git提交加点表情
  8. 杀毒软件选择的最大误区
  9. 单板计算机图片大全,2021年全球10大最佳单板计算机开发板(SBC)(第4-6名)(图文)...
  10. TextView上使用inputType=“textMultiLine“问题