前面我们介绍了BypassMergeSortShuffleWriterSortShuffleWriter,知道了它们的应用场景和实现方式,本节我们来看下UnsafeShuffleWriter,它使用了Tungsten优化,排序的是二进制的数据,不会对数据进行反序列化操作,在某些情况下会加速Shuffle过程。

概述

UnsafeShuffleWriter提供了以序列化方式写入文件的方式,写入的内存既可以是on-heap也可以是off-heap,当数据到来时候,先序列化,然后使用序列化的数据排序,要想使用该Writer需要满足以下条件:

  1. ShuffleDependency的序列化器需要支持对流中输出的序列化后的对象的字节进行排序;
  2. ShuffleDependency不能指定指定聚合器,map阶段不进行聚合操作,用到了Tungsten优化,排序的是二进制的数据,不会对数据进行反序列化操作(反序列化就是转成Java对象),所以不支持aggregation;
  3. Shuffle过程产生的分区数大于16777216 (1 << 24),内部存储的PackedRecordPointer 中8字节的前24bit表示分区数目,最大是16777216。

UnsafeShuffleWriter基于序列化的二进制数据进行排序,而不是基于java object,可以节省内存和GC开销,加速Shuffle过程,接下来我们来分析下具体的实现以及依赖的组件。

依赖

MemoryLocation

tungsten-sort机制中,也存在数据缓存和溢写,这与sort shuffle是类似的。但是,这里不再借助像PartitionedPairBuffer之类的高级数据结构,而是由程序自己完成,并且是直接操作内存空间,而且可以使用堆内内存或者堆外内存,当使用堆外内存时候,由于不受JVM GC的影响,可以直接使用内存地址来进行寻址;但是使用堆内内存时候,内存地址可以由一个base对象和一个offset对象组合起来表示,但是对于一些数据结构比如在hashmap或者是sorting buffer中的记录的指针,尽管我们决定使用128位来寻址,我们不能只存base对象的地址,因为由于gc的存在,这个地址不能保证是稳定不变的。为了解决这个问题,Tungsten使用页表来管理内存,使用64位的高13位来保存内存页数,低51位来保存这个页中的offset,使用page表来保存base对象,其在page表中的索引就是该内存的内存页数。另外为了统一堆内堆外内存,统一抽象页为MemoryLocation,包含了obj对象和offset:

  1. obj: 如果是堆内存模式时,数据作为对象存储在JVM的堆上,此时的obj不为空;处于堆外内存模式时,数据存储在JVM的堆外内存中,因而不会在JVM中存在对象;
  2. offset属性主要用来定位数据,堆内存模式时,首先从堆内找到对象,然后使用offset定位数据的具体位置;堆外内存模式时,则直接使用offset从堆外内存中定位。

MemoryBlock继承自MemoryLocation代表从obj和offset定位的起始位置开始,固定长度的连续内存块,是申请来的具体的内存空间,长度由length来决定,可以作为TaskMemoryManager页表中的一个页的抽象。

TaskMemoryManager

前面在讲解SortShuffleWriter时候,执行内存的申请与释放是通过TaskMemoryManageracquireExecutionMemoryreleaseExecutionMemory进行的,UnsafeShuffleWriter为了统一堆内和堆外内存,使用页表来管理内存的申请和释放,TaskMemoryManager提供的是allocatePagefreePage,并提供了一系列寻址的方法。

页表

TaskMemoryManager用页表来管理内存,维护了一个MemoryBlock数组用于存放该TMM分配得到的pages[pageTable]逻辑地址用一个Long类型(64-bit)来表示,高13位来保存内存页数,低51位来保存这个页中的offset,使用page表来保存base对象,其在page表中的索引就是该内存的内存页数。页数最多有8192页,理论上允许索引 8192 * (2^31 -1)* 8 bytes,相当于140TB的数据。其中 2^31 -1 是整数的最大值,因为page表中记录索引的是一个long型数组,这个数组的最大长度是2^31 -1。实际上没有那么大。因为64位中除了用来设计页数和页内偏移量外还用于存放数据的分区信息。

/** 13位用来表示能存储的最大页数:8092 */
private static final int PAGE_NUMBER_BITS = 13;/** 最大页数 8092*/
private static final int PAGE_TABLE_SIZE = 1 << PAGE_NUMBER_BITS;/** 用于保存编码后的偏移量的位数。静态常量OFFSET_BITS的值为51。 */
static final int OFFSET_BITS = 64 - PAGE_NUMBER_BITS; /** 具体的内存存储是由一个数组存储,数组最大是2^32-1,数组元素是long类型,所以是8字节;所以得到最大的Page大小。静态常量MAXIMUM_PAGE_SIZE_BYTES的值为17179869176,即(2^32-1)× 8==17G。*/
public static final long MAXIMUM_PAGE_SIZE_BYTES = ((1L << 31) - 1) * 8L;/** 长整型的低51位的位掩码。静态常量MASK_LONG_LOWER_51_BITS的值为2251799813685247 */
private static final long MASK_LONG_LOWER_51_BITS = 0x7FFFFFFFFFFFFL;/* 维护一个Page表。pageTable实际为Page(即MemoryBlock)的数组,数组长度为PAGE_TABLE_SIZE。*/
private final MemoryBlock[] pageTable = new MemoryBlock[PAGE_TABLE_SIZE];

寻址

编码

将page和在page中的偏移量来编码为内存地址:

  1. 如果是堆外内存,由于是操作系统中的地址,所以直接是跟起始地址的偏移量;
  2. 如果是堆内内存,需要根据页号和偏移量进行组合得到。
public long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) {if (tungstenMemoryMode == MemoryMode.OFF_HEAP) { // Tungsten的内存模式是堆外内存// 此时的参数offsetInPage是操作系统内存的绝对地址,offsetInPage与MemoryBlock的起始地址之差就是相对于起始地址的偏移量offsetInPage -= page.getBaseOffset();}// 通过位运算将页号存储到64位长整型的高13位中,并将偏移量存储到64位长整型的低51位中,返回生成的64位的长整型。return encodePageNumberAndOffset(page.pageNumber, offsetInPage);
}// 获取页号相对于内存块起始地址的偏移量
public static long encodePageNumberAndOffset(int pageNumber, long offsetInPage) {return (((long) pageNumber) << OFFSET_BITS) | (offsetInPage & MASK_LONG_LOWER_51_BITS);
}

解码

根据逻辑地址获取页号和偏移量,页号是8字节的13位,而偏移量是低51位。

// 用于解码页号,将64位的长整型右移51位(只剩下页号),然后转换为整型以获得Page的页号。
public static int decodePageNumber(long pagePlusOffsetAddress) {// 右移51位return (int) (pagePlusOffsetAddress >>> OFFSET_BITS);
}// 解码偏移量,用于将64位的长整型与51位的掩码按位进行与运算,以获得在Page中的偏移量。
private static long decodeOffset(long pagePlusOffsetAddress) {// 与上MASK_LONG_LOWER_51_BITS掩码,即取pagePlusOffsetAddress的低51位return (pagePlusOffsetAddress & MASK_LONG_LOWER_51_BITS);
}

LongArray

LongArray本质是对MemoryBlock的封装,持有一块内存,它是通过ShuffleExternalSorter申请来的一个内存块MemoryBlock构建的,也就是申请一个页表,然后将内存空间按照8字节来进行切割,形成数组,所以能装下size / 8个元素,另外提供了添加数据以及获取数据的方法,数组里面的元素是8字节的PackedRecordPointer

// 设置数据
public void set(int index, long value) {// 插入到baseObj的指定位置,baseObj就是MemoryBlock的objPlatform.putLong(baseObj, baseOffset + index * WIDTH, value);
}// 访问数据
public long get(int index) {// 获取指定数据位位上的Long型数据return Platform.getLong(baseObj, baseOffset + index * WIDTH);
}

PackedRecordPointer

上面我们讲到LongArray的每个元素的大小为8字节,这个是因为数组内部存储的是记录指针PackedRecordPointer,它使用8字节来存储记录信息,主要分为三部分: [24 bit 分区号][13 bit 内存页号][27 bit 内存偏移量],前面我们讲到TaskMemoryManager使用前13位表示内存页号,后51位表示偏移量,这里内存页号是足量的,但内存偏移量只有27位,如果offset in page大于2^27,就会出现高位丟失的问题。所以可寻址的最大偏移量的大小为2^27 bit,即128 MB,所以一个Task的可用内存 = 总页数 * 页的最大大小 = 2^13 * 2^27 = 2^40 = 1 TB字节.

初始化

recordPointertaskMemoryManager返回的页号和相对于内存块起始地址的偏移量,partitionId是分区号,将他们两个按照上面的格式:[24 bit 分区号][13 bit 内存页号][27 bit 内存偏移量]组装起来。

  1. 首先取出来页号,recordPointer的高13位表示页号,由于页号前面24位是分区号的位置,所以需要右移24位;
  2. 提取偏移量,recordPointer的低27位是偏移量,将页号和偏移量(offset)组装起来;
  3. 最后前面24位是分区号,将分区号和页号偏移量组合起来,形成8字节的long。
// 低51位掩码
private static final long MASK_LONG_LOWER_51_BITS = (1L << 51) - 1;
// 高13位掩码
private static final long MASK_LONG_UPPER_13_BITS = ~MASK_LONG_LOWER_51_BITS;
// 低27位掩码,可以取到offset
private static final long MASK_LONG_LOWER_27_BITS = (1L << 27) - 1;public static long packPointer(long recordPointer, int partitionId) {// 取高13位,即页号final long pageNumber = (recordPointer & MASK_LONG_UPPER_13_BITS) >>> 24;// 取低27位,获取offset,将页号和偏移量进行相或,组成低40位final long compressedAddress = pageNumber | (recordPointer & MASK_LONG_LOWER_27_BITS);// 将分区号右移40位,然后与页号和偏移量的40位进行相与,取得64位long型值return (((long) partitionId) << 40) | compressedAddress;
}

获取分区号

获取分区号,高24位表示分区号。

private static final long MASK_LONG_LOWER_40_BITS = (1L << 40) - 1;
private static final long MASK_LONG_UPPER_24_BITS = ~MASK_LONG_LOWER_40_BITS;// 获取分区号
public int getPartitionId() {// 即取packedRecordPointer的高24位,然后右移去掉低40位即可return (int) ((packedRecordPointer & MASK_LONG_UPPER_24_BITS) >>> 40);
}

获取页号和偏移量

组装的格式:[24 bit 分区号][13 bit 内存页号][27 bit 内存偏移量],提取页号和内存偏移量然后组成64位的long。

// 获取TaskMemoryManager需要的页号和偏移量
public long getRecordPointer() {// 左移24位,去掉分区号,然后取高13位即是页号final long pageNumber = (packedRecordPointer << 24) & MASK_LONG_UPPER_13_BITS;// 直接取低27位即是偏移量final long offsetInPage = packedRecordPointer & MASK_LONG_LOWER_27_BITS;// 将高13位和低27位组合成64位的long型整数return pageNumber | offsetInPage;
}

ShuffleInMemorySorter

ShuffleInMemorySorterUnsafeShuffleWriter内存缓冲区使用的缓冲区和排序器,内部是由LongArray来存储数据,该类主要进行插入数据,存储数据,扩充Array以及返回排序好的数据迭代器等,实际存储的是数据的地址和分区组合结果,我们来分析下它是如何实现的。

私有变量

  1. consumer内存消费者,这个实际是ShuffleExternalSorter,可以向TaskMemoryManager申请和释放内存;
  2. array是数据内存存储的地方,内存可以是堆内或者堆外内存;
  3. useRadixSort是否使用基数排序,如果使用基数排序,数组需要留一定的空间,否则使用TimSort进行排序,由参数spark.shuffle.sort.useRadixSort控制,默认是使用的;
  4. pos表示当前array可以写入数据的下标;
  5. usableCapacity是可用内存容量,当使用基数排序时为申请内存的1/2,否则为申请内存的1/1.5;
  6. initialSize是最初的缓冲区大小,有参数spark.shuffle.sort.initialBufferSize控制,默认是4M;
private final MemoryConsumer consumer;
// 申请的内存的主要表现方式;排序器将操作该对象,代替直接操作记录
private LongArray array;// 是否使用基数排序;基数排序比较快,但需要额外的内存。
private final boolean useRadixSort;private int pos = 0;
private int usableCapacity = 0;// 初始的排序缓冲大小
private int initialSize;

内存申请&释放

内存的申请与释放是通过MemoryConsumer来进行的,可以用来向TaskMemoryManager申请和释放内存,我们来看下申请长度为size的内存的步骤:

  1. 通过TaskMemoryManger来申请一个页;
  2. 如果分配不到页或者分配页的大小比较小,则需要释放申请到的页,抛出OOM异常;
  3. 如果分配到的页满足申请要求,就组装为LongArray返回给ShuffleInMemorySorter使用。
// org.apache.spark.memory.MemoryConsumer
public LongArray allocateArray(long size) {// 计算所需的Page大小。由于长整型占用8个字节,所以需要乘以8。long required = size * 8L;// 分配指定大小的MemoryBlockMemoryBlock page = taskMemoryManager.allocatePage(required, this);// 分配得到的MemoryBlock的大小小于所需的大小if (page == null || page.size() < required) {long got = 0;if (page != null) {got = page.size();// 释放MemoryBlocktaskMemoryManager.freePage(page, this);}// 打印内存使用信息并抛出OutOf MemoryError。taskMemoryManager.showMemoryUsage();throw new OutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " + got);}// 将required累加到used,即更新已经使用的内存大小used += required;// 创建并返回LongArrayreturn new LongArray(page);
}

内存释放比较简单,直接释放相应的页即可。

// org.apache.spark.memory.MemoryConsumer
public void freeArray(LongArray array) {freePage(array.memoryBlock());
}protected void freePage(MemoryBlock page) {// 首先更新usedused -= page.size();// 释放MemoryBlocktaskMemoryManager.freePage(page, this);
}

数据管理

insertRecord插入数据到内存缓冲区中,需要先检查是否还有足够的空间可以插入,然后将分区号和地址进行组装为PackedRecordPointer,放入到LongArray中。

// org.apache.spark.shuffle.sort.ShuffleInMemorySorter
// 判断是否还有剩余空间
public boolean hasSpaceForAnotherRecord() {return pos < usableCapacity;
}public void insertRecord(long recordPointer, int partitionId) {// 检查是否还有空闲空间if (!hasSpaceForAnotherRecord()) {throw new IllegalStateException("There is no space for new record");}// 将转换后得到的PackedRecordPointer设置到array的pos位置array.set(pos, PackedRecordPointer.packPointer(recordPointer, partitionId));pos++;
}// 获取记录数量
public int numRecords() {return pos;
}

当内存使用完后,需要进行扩容,ShuffleExternalSorterTaskMemoryManager中申请内存页,并封装为LongArray,将现有的数据复制到新的数组中,释放老的数组内存即可。

// org.apache.spark.shuffle.sort.ShuffleInMemorySorter
// 对内存进行扩容
public void expandPointerArray(LongArray newArray) {// 检查,扩容后的内存大小应该大于现有内存大小assert(newArray.size() > array.size());// 将当前array中的数据拷贝到newArrayPlatform.copyMemory(array.getBaseObject(),array.getBaseOffset(),newArray.getBaseObject(),newArray.getBaseOffset(),pos * 8L);// 释放当前array占用的内存consumer.freeArray(array);// 将新的newArray替换旧arrayarray = newArray;// 重新计算可用容量usableCapacity = getUsableCapacity();
}

返回排序好的迭代器

排序准则

由于LongArray里面存储的是PackedRecordPointer,对LongArray的排序,就是排序PackedRecordPointer,会按照分区号进行比较,同一分区的数据会在一起,分区内部的key是无序的。

private static final class SortComparator implements Comparator<PackedRecordPointer> {@Overridepublic int compare(PackedRecordPointer left, PackedRecordPointer right) {// 获取两个分区的IDint leftId = left.getPartitionId();int rightId = right.getPartitionId();// 使用分区ID进行比较return leftId < rightId ? -1 : (leftId > rightId ? 1 : 0);}
}

排序

有了元素的比较器,就可以进行排序了,可以使用基数排序或者TimSort,基数排序速度比较快,但是需要额外的内存。

public ShuffleSorterIterator getSortedIterator() {int offset = 0;// 判断是否使用基数排序if (useRadixSort) {// 基数排序,按照分区号升序排序offset = RadixSort.sort(array, pos,PackedRecordPointer.PARTITION_ID_START_BYTE_INDEX,PackedRecordPointer.PARTITION_ID_END_BYTE_INDEX, false, false);} else {// 普通TimSort排序MemoryBlock unused = new MemoryBlock(array.getBaseObject(),array.getBaseOffset() + pos * 8L,(array.size() - pos) * 8L);LongArray buffer = new LongArray(unused);Sorter<PackedRecordPointer, LongArray> sorter =new Sorter<>(new ShuffleSortDataFormat(buffer));// 使用SORT_COMPARATOR,即SortComparator比较器sorter.sort(array, 0, pos, SORT_COMPARATOR);}// 构造为ShuffleSorterIterator后返回return new ShuffleSorterIterator(pos, array, offset);
}

返回迭代器

排完序后需要提供给外界访问排序完数据的迭代器,内部使用packedRecordPointer指向当前记录,外界通过使用hasNext判断是否还有数据,loadNext来重置packedRecordPointer,同时通过packedRecordPointer来获取当前的数据元素。

public static final class ShuffleSorterIterator {// LongArray内部封装了MemoryBlock对象private final LongArray pointerArray;private final int limit;// 记录了分区号,页号及偏移量的记录指针final PackedRecordPointer packedRecordPointer = new PackedRecordPointer();private int position = 0;ShuffleSorterIterator(int numRecords, LongArray pointerArray, int startingPosition) {this.limit = numRecords + startingPosition;this.pointerArray = pointerArray;this.position = startingPosition;}public boolean hasNext() { // postition小于limit时表示还有数据return position < limit;}public void loadNext() {// 从pointerArray中获取指定位置的long型整数设置到packedRecordPointerpackedRecordPointer.set(pointerArray.get(position));// positionposition++;}
}

ShuffleExternalSorter

ShuffleExternalSorter是专门用于对Shuffle数据存储,用于将map任务的输出存储到Tungsten中,在记录超过内存限制时,会利用ShuffleInMemorySort进行数据排序,将数据溢出到磁盘。与ExternalSorter不同,ShuffleExternalSorter本身并没有实现数据的文件合并功能,具体的数据合并是由UnsafeShuffleWriter来实现;另外ShuffleExternalSorter继承与MemoryConsumer,是执行内存消费方,可以向TaskMemoryManager申请和释放内存。

私有变量

  1. numPartitions是reduce的分区数目;taskMemoryManager是任务内存管理器;blockManager是存储管理类;
  2. numElementsForSpillThreshold是溢写磁盘的元素个数,有参数spark.shuffle.spill.numElementsForceSpillThreshold控制,默认是1024 * 1024 * 1024
  3. fileBufferSizeBytes是创建的DiskBlockObjectWriter内部的文件缓冲大小,可以通过spark.shuffle.file.buffer设置,默认为32M;
  4. allocatedPages是已经分配的Page<即MemoryBlock>列表,可能使用多个页表,最多可以使用2^13个页表;
  5. spills溢出文件的元数据信息的列表;
  6. ShuffleInMemorySorter是用于在内存中对插入的记录进行排序;
  7. currentPage是当前使用的page;
  8. pageCursor是Page的光标,实际为用于向Tungsten写入数据时的地址信息。
private final int numPartitions;
private final TaskMemoryManager taskMemoryManager;
private final BlockManager blockManager;
// 溢写阈值
private final long numElementsForSpillThreshold;
// 文件缓冲区
private final int fileBufferSizeBytes;
// 已分配的页表和溢写的文件信息
private final LinkedList<MemoryBlock> allocatedPages = new LinkedList<>();
private final LinkedList<SpillInfo> spills = new LinkedList<>();
// 内存排序
@Nullable private ShuffleInMemorySorter inMemSorter;
// 当前page信息
@Nullable private MemoryBlock currentPage = null;
private long pageCursor = -1;

堆内还是堆外内存

ShuffleExternalSorter继承与MemoryConsumer,那么使用的执行内存到底是堆内还是堆外内存呢,这个是有MemoryConsumer中的mode变量来决定的,我们来看下它是如何初始化的,构造函数中会先调用MemoryConsumer的构造函数,可以看到mode遍历是由TaskMemoryManagergetTungstenMemoryMode获取的,而这个值是通过TaskMemoryManager构造函数初始化的,是获取的memoryManagertungstenMemoryMode

// org.apache.spark.memory.MemoryConsumer
protected MemoryConsumer(TaskMemoryManager taskMemoryManager, long pageSize, MemoryMode mode) {this.taskMemoryManager = taskMemoryManager;this.pageSize = pageSize;this.mode = mode;
}// org.apache.spark.shuffle.sort.ShuffleExternalSorter
ShuffleExternalSorter(TaskMemoryManager memoryManager, BlockManager blockManager,TaskContext taskContext, int initialSize, int numPartitions, SparkConf conf,ShuffleWriteMetrics writeMetrics) {// Tungsten内存模式是通过MemoryManager获取的super(memoryManager,(int) Math.min(PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, memoryManager.pageSizeBytes()),memoryManager.getTungstenMemoryMode());
}// org.apache.spark.memory.TaskMemoryManager
public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) {this.tungstenMemoryMode = memoryManager.tungstenMemoryMode();this.memoryManager = memoryManager;this.taskAttemptId = taskAttemptId;this.consumers = new HashSet<>();
}

我们来看下MemoryManagertungstenMemoryMode是如何实现的,tungstenMemoryMode采用枚举类型MemoryMode来表示堆内存和堆外内存:

  1. 当设置了spark.memory.offHeap.enabled参数时候使用堆外内存,但是要保证spark.memory.offHeap.size大于0;
  2. 没有设置堆外内存启用时候则使用堆内内存。
// org.apache.spark.memory.MemoryManager
final val tungstenMemoryMode: MemoryMode = {if (conf.getBoolean("spark.memory.offHeap.enabled", false)) {require(conf.getSizeAsBytes("spark.memory.offHeap.size", 0) > 0,"spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true")require(Platform.unaligned(),"No support for unaligned Unsafe. Set spark.memory.offHeap.enabled to false.")MemoryMode.OFF_HEAP} else {MemoryMode.ON_HEAP}}

插入数据

Spark中map任务在执行结束后会将数据写入磁盘,等待reduce任务获取。在写入磁盘之前,Spark可能会对map任务的输出在内存中进行一些排序和聚合。insertRecord()方法是这一过程的入口,步骤如下:

  1. 首先会检查是否达到溢写磁盘的要求,总条数大于等于10亿条,如果达到了就要溢写磁盘;
  2. 不用溢写则要检查ShuffleInMemory是否还有足够的空间将额外的记录插入到排序指针数组中,如果需要额外的空间,则增加数组的容量;如果无法获取所需的空间,则内存中的数据将被溢出到磁盘;
  3. 检查用于存储数据的page是否还有空间,需要的空间是序列化后的k-v长度加上4个字节自己记录这个长度值,如果当前page是空或者不足以容纳改长度的数据,就需要申请新的page;
  4. 使用taskMemoryManager来对页面和偏移量编码,得到数据的页号和偏移量的编码值;
  5. 在当前页和数据偏移量先插入4字节的k-v的序列化后的长度,然后移动偏移指针,在length空间中插入真正的数值,更新偏移指针;
  6. 将当前记录的偏移量的位置给shuffleInMemorySorter,用这个偏移量和分区号进行编码,得到PackedRecordPointer,最后通过它来排序。
public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId) throws IOException {// 判断是否需要溢写磁盘if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {spill();}// 判断缓冲器是否空间充裕growPointerArrayIfNecessary();final int required = length + 4; // 4字节用于在内存中存储数据的长度// 检查是否有足够的空间,如果需要额外的空间,则申请分配新的PageacquireNewPageIfNecessary(required);final Object base = currentPage.getBaseObject();// 返回页号和相对于内存块起始地址的偏移量(64位长整型)。final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);// 向Page所代表的内存块的起始地址写入数据的长度Platform.putInt(base, pageCursor, length);pageCursor += 4;// 将记录数据拷贝到Page中Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);pageCursor += length;// 将记录的元数据信息存储到内部用的长整型数组中,以便于排序。其中高24位存储分区ID,中间13位存储页号,低27位存储偏移量。inMemSorter.insertRecord(recordAddress, partitionId);
}

ShuffleInMemorySorter空间申请

growPointerArrayIfNecessary实现了在ShuffleInMemorySort内存不足时候扩容的操作,步骤如下:

  1. 检查ShuffleInMemorySort内存排序器是否有足够的空间以插入记录;
  2. 如果空间不足则进行扩容,扩容为原来的两倍,allocateArray会向TaskMemoryManager申请内存,申请内存过程中如果内存不足是要对可以释放内存的MemoryConsumer进行磁盘溢写操作释放内存的操作,ShuffleExternalSorter这个MemoryConsumer就可以被释放内存,重置数组pos位置,所以这里可能会导致间接的释放内存操作;
  3. 如果内存不足抛出了OOM异常,需要看是否已经溢写磁盘了,如果没有抛出异常;
  4. 最后再次检查是否进行了溢写,如果溢写了,释放这次申请到的空间,没有溢写则需要用新的内存空间替换旧的内存空间,进行数据复制,这个是通过ShuffleInMemorySortexpandPointerArray实现的。
private void growPointerArrayIfNecessary() throws IOException {// 没有剩余空间了if (!inMemSorter.hasSpaceForAnotherRecord()) {// 获取内存排序器当前使用的内存大小long used = inMemSorter.getMemoryUsage();LongArray array;try {// 尝试进行扩容,扩容为原来的两倍array = allocateArray(used / 8 * 2);} catch (OutOfMemoryError e) { // 产生异常说明内存不足,无法满足申请// 申请内存时虽然出现OutOfMemoryError错误,但由于进行了溢写操作,// 因此当前的MemoryConsumer有可能也发生了溢写,在当前MemoryConsumer出现溢写后,// 会重置调用内存排序器的reset()方法重置,此操作会刷新可使用的内存大小,// 如果内存排序器此时还是没有可用的内存,则抛出异常。if (!inMemSorter.hasSpaceForAnotherRecord()) {throw e;}return;}// 检查是否进行了溢写if (inMemSorter.hasSpaceForAnotherRecord()) {// 进行了溢写,释放扩容时申请的空间freeArray(array);} else {// 没有进行溢写,将申请的扩容空间设置为内存排序器的内存空间inMemSorter.expandPointerArray(array);}}
}

Page申请

ShuffleExternalSorter内部是使用Page来存储数据的,allocatedPages是已经获取到的page列表,currentPages是当前的page,pageCursor是当前page的偏移量,当新的数据需要插入时候,先进行序列化得到序列化后的长度,然后看当前page是否满足:

  1. 当前页currentPage为null,则需要申请第一个MemoryBlock作为当前页;
  2. 当前页currentPage不为null,但当前页的游标加上需要增加的偏移量大于当前页的总大小,说明当前页无法满足需要的空间,则申请一个新的页
  3. 新申请的页添加到allocatedPages中。
private void acquireNewPageIfNecessary(int required) {if (currentPage == null ||pageCursor + required > currentPage.getBaseOffset() + currentPage.size() ) {// 申请新的MemoryBlock作为currentPage,大小为requiredcurrentPage = allocatePage(required);// 将游标pageCursor指向currentPage的baseOffsetpageCursor = currentPage.getBaseOffset();// 将新申请的页放入allocatedPages进行管理allocatedPages.add(currentPage);}
}

Page申请是在MemoryConsumer中进行的,主要是通过taskMemoryManager来申请页表<现根据内存模式申请相应的空间,空间不足可能会涉及到其他consumer的溢写磁盘操作,如果还不能满足,当前consumer可能会溢写磁盘,最后分配页>,并记录使用的空间大小。

// org.apache.spark.memor.MemoryConsumer
protected MemoryBlock allocatePage(long required) {// 使用TaskMemoryManager的allocatePage()方法申请MemoryBlock page = taskMemoryManager.allocatePage(Math.max(pageSize, required), this);// 判断申请是否足够if (page == null || page.size() < required) {long got = 0;if (page != null) {got = page.size();// 申请不足,释放已申请的大小taskMemoryManager.freePage(page, this);}taskMemoryManager.showMemoryUsage();throw new OutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " + got);}// 将required累加到used,即更新已经使用的内存大小used += page.size();// 返回MemoryBlock对象return page;
}

溢写磁盘

当记录数目达到阈值或者申请不到内存时候都有可能进行溢写磁盘操作,会将当前内存中的元素排序后写入磁盘。

磁盘存储文件

磁盘文件抽象表示为SpillInfo,记录了block的一些元数据信息,记录了每个分区的长度,文件的名字以及对应的BlockId。比较简单,如下所示:

final class SpillInfo {final long[] partitionLengths;final File file;final TempShuffleBlockId blockId;SpillInfo(int numPartitions, File file, TempShuffleBlockId blockId) {this.partitionLengths = new long[numPartitions];this.file = file;this.blockId = blockId;}
}

溢写入口

ShuffleExternalSorter继承了MemoryConsumer,实现了Spill方法,方法比较简单,主要是调用writeSortedFile将数据写入到文件中,释放内存,重置inMemSorter

public long spill(long size, MemoryConsumer trigger) throws IOException {if (trigger != this || inMemSorter == null || inMemSorter.numRecords() == 0) {return 0L;}writeSortedFile(false);// 将所使用的Page(即MemoryBlock)全部释放final long spillSize = freeMemory();// 重置ShuffleMemorySorter底层的长整型数组,便于下次排序inMemSorter.reset();// 更新任务度量信息taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);// 返回溢出的数据大小return spillSize;
}

我们顺便看下freeMemory操作,主要是对已经分配的页表的每个页进行内存释放<MemoryConsumer执行>,清空占用的页表列表。

// 释放内存
private long freeMemory() {// 更新内存使用峰值updatePeakMemoryUsed();long memoryFreed = 0;// 遍历所有的MemoryBlockfor (MemoryBlock block : allocatedPages) {// 累计释放的内存大小memoryFreed += block.size();// 使用freeBlock()方法释放freePage(block);}// 清空保存MemoryBlock的列表allocatedPages.clear();// 清空当前使用的MemoryBlock页currentPage = null;// 清空页游标pageCursor = 0;// 返回释放的内存总大小return memoryFreed;
}

真正溢写操作

writeSortedFile是真正的将内存中的记录进行排序后输出到磁盘的入口,步骤如下:

  1. 首先获取内存中基于分区排序好的数据迭代器;
  2. 新建溢出的数据文件,创建一个TempShuffleBlockId来唯一标识,将分区数、文件对象和BlockId封装为SpillInfo溢写信息对象;
  3. 初始化DiskBlockObjectWriter,是通过BlockManager获取的,指定了序列化对象,缓冲区大小,blockId等;
  4. 遍历按照分区排序好的数据迭代器,不断的获取数据,查看数据的分区号,如果分区号跟之前不一样<之前不为-1,-1代表还没开始读取一个分区数据>,说明一个分区的数据读取完毕,Writer需要将输出流中的数据写入到磁盘,并返回FileSegment,记录了该分区在文件中的偏移量以及长度,根据这些信息需要更新SpillInfo的上一个分区的长度信息;
  5. 然后根据PackedRecordPointer <[24 bit 分区号][13 bit 内存页号][27 bit 内存偏移量]>获取内存页号+页内偏移量组成的64位地址,TaskMemoryManager会解码出来页号和偏移量,由于数据在页中存储格式是[4字节的k-v序列化的数据长度][k-v序列化后的数据]通过页号和偏移量,先获取4字节的数据长度,然后不断的从接下来的数据位置获取数据写入到缓冲区中,是通过Platform来进行数据copy的,然后写入wrier中,不断地更新偏移量,直到读取完成,Writer会对记录数进行统计;
  6. Map任务输出数据已写完,将缓冲区中的数据写出到磁盘,更新最后一个分区的长度,然后加入到spillInfos中,等待后续合并多个磁盘文件。
private void writeSortedFile(boolean isLastFile) throws IOException {// 度量相关...// >>>>1. 获取基于内存的Shuffle排序迭代器final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =inMemSorter.getSortedIterator();// >>>>2. 在磁盘上创建唯一的TempShuffleBlockId和对应的文件,作为溢出操作的数据存储文件。final Tuple2<TempShuffleBlockId, File> spilledFileInfo =blockManager.diskBlockManager().createTempShuffleBlock();// 获取文件对象final File file = spilledFileInfo._2();// 获取对应的BlockIdfinal TempShuffleBlockId blockId = spilledFileInfo._1();// 将分区数、文件对象和BlockId封装为SpillInfo溢写信息对象final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId);// >>>>3. 磁盘写缓冲,1MBfinal byte[] writeBuffer = new byte[DISK_WRITE_BUFFER_SIZE];final SerializerInstance ser = DummySerializerInstance.INSTANCE;// 获取DiskBlockObjectWriterfinal DiskBlockObjectWriter writer =blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse);int currentPartition = -1;// 如果还有经过排序的索引记录while (sortedRecords.hasNext()) {// >>>>>4. 取出下一条索引记录sortedRecords.loadNext();// 获取索引中记录的分区号final int partition = sortedRecords.packedRecordPointer.getPartitionId();// 分区号需要大于currentPartition,以递增写入// 检查分区号是否与当前的相同if (partition != currentPartition) {// 不相同,且currentPartition不为-1,则需要转换为新分区if (currentPartition != -1) {// 先将当前的缓冲流中的数据写出到磁盘,返回的是当前写出操作的FileSegment// FileSegment对象包含了当前写出数据的文件、偏移量和长度final FileSegment fileSegment = writer.commitAndGet();// 记录当前分区的数据长度spillInfo.partitionLengths[currentPartition] = fileSegment.length();}// 更新currentPartition为下一个分区号currentPartition = partition;}// >>>>> 5. 获取当前记录的PackedRecordPointer指针中的Map任务输出数据存储的页号和偏移量final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();// 从TaskMemoryManager中获取Map任务输出数据在内存中对应的页和偏移量final Object recordPage = taskMemoryManager.getPage(recordPointer);final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer);// 读取4字节的Int值,作为dataRemainingint dataRemaining = Platform.getInt(recordPage, recordOffsetInPage);// 跳过4字节的Int值,即跳过dataRemaining的长度long recordReadPosition = recordOffsetInPage + 4;// 当还有剩余数据时while (dataRemaining > 0) {// 计算需要进行transfer的数据,大小不能超过DISK_WRITE_BUFFER_SIZE(1MB)final int toTransfer = Math.min(DISK_WRITE_BUFFER_SIZE, dataRemaining);// 将数据拷贝到writeBufferPlatform.copyMemory(recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer);// 将writeBuffer的数据通过DiskBlockObjectWriter先写出到缓冲区writer.write(writeBuffer, 0, toTransfer);// 更新偏移量recordReadPosition += toTransfer;// 更新剩余数据量dataRemaining -= toTransfer;}// 对写入的记录数进行统计和度量writer.recordWritten();}// >>>>> 6.Map任务输出数据已写完,将缓冲区中的数据写出到磁盘final FileSegment committedSegment = writer.commitAndGet();// 关闭写出器writer.close();// 检查currentPartition是否为-1,如果不为-1,说明是有数据写出的if (currentPartition != -1) {// 更新最后一个写出分区的数据长度spillInfo.partitionLengths[currentPartition] = committedSegment.length();// 将spillInfo记录到spills数组spills.add(spillInfo);}...
}

清理内存数据

ShuffleExternalSorter不进行最后的文件合并,不过提供给了closeAndGetSpills函数,将最后在内存中的数据写入到磁盘中,并返回目前已经落到磁盘文件的文件信息数组,供UnsafeShuffleWriter进行合并操作。

public SpillInfo[] closeAndGetSpills() throws IOException {try {if (inMemSorter != null) {// Do not count the final file towards the spill count.// 传入的isLastFile为truewriteSortedFile(true);// 释放内存freeMemory();// 释放排序器内存inMemSorter.free();inMemSorter = null;}// 将spills数组重新构建为一个新的SpillInfo数组并返回return spills.toArray(new SpillInfo[spills.size()]);} catch (IOException e) {// 出现异常则清理资源并抛出异常cleanupResources();throw e;}
}

UnsafeShuffleWriter

UnsafeShuffleWriter的流程和SortShuffleWriter相似,都是先将数据放在内存缓冲区中,然后缓冲区内存不足时候,向TaskMemoryManager申请内存,等到达到Spill条件时候,将内存中数据按照分区进行排序刷新到磁盘中,形成一个或者多个小文件,等数据写完时候,将内存中和磁盘中的文件进行合并,生成最后的数据文件,并生成索引文件。

私有变量&初始化

  1. transferToEnabled是否使用NIO,由参数spark.file.transferTo决定, 最后对文件合并时是否使用NIO的方式进行file stream的copy,默认为true;
  2. initialSortBufferSize是初始化的排序缓冲大小,由参数 spark.shuffle.sort.initialBufferSize决定,默认为4KB,这个比SortedBaseShuffleWrite那个初始化的缓冲大小要小多了,那个是5M,反正也会扩容的);
  3. ShuffleExternalSorter是进行数据排序和溢写磁盘操作的,上面详细讲解过;
  4. serBuffer是及进行序列化数据存储的,serOutputStream是将serBuffer包装为序列化流的对象。
public UnsafeShuffleWriter(BlockManager blockManager, IndexShuffleBlockResolver shuffleBlockResolver,TaskMemoryManager memoryManager, SerializedShuffleHandle<K, V> handle, int mapId,TaskContext taskContext,  SparkConf sparkConf) throws IOException {// 由于UnsafeShuffleWriter是SerializedShuffleHandle对应的序列化模式下的ShuffleWriter,// 所以需要检查分区器的分区总数,不可大于16777216final int numPartitions = handle.dependency().partitioner().numPartitions();if (numPartitions > SortShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE()) {// 分区总数大于16777216,抛异常...}this.blockManager = blockManager;// 用于操作索引文件与数据文件this.shuffleBlockResolver = shuffleBlockResolver;this.memoryManager = memoryManager;this.mapId = mapId;final ShuffleDependency<K, V, V> dep = handle.dependency();this.shuffleId = dep.shuffleId();// 序列化器this.serializer = dep.serializer().newInstance();// 分区器this.partitioner = dep.partitioner();this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics();this.taskContext = taskContext;this.sparkConf = sparkConf;// 是否允许transferTo操作,默认允许this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true);// 初始状态的排序缓冲区大小,默认为4096字节,即4Kthis.initialSortBufferSize = sparkConf.getInt("spark.shuffle.sort.initialBufferSize", DEFAULT_INITIAL_SORT_BUFFER_SIZE);// 调用open()方法open();
}private void open() throws IOException {assert (sorter == null);// 创建外部Shuffle排序器sorter = new ShuffleExternalSorter(memoryManager,blockManager, taskContext, initialSortBufferSize, partitioner.numPartitions(), sparkConf, writeMetrics);// 序列化缓冲serBuffer = new MyByteArrayOutputStream(1024 * 1024);// 包装为序列化流对象serOutputStream = serializer.serializeStream(serBuffer);
}

整体流程

writer的主要工作是对上游RDD的迭代器得到的K-V数据进行缓存,排序,落盘最后合并成一个文件通知MapOutTracker更新ShuffleMap任务结束信息供Reduce数据读取,流程比较简单,主要是遍历每一条记录,使用insertRecordIntoSorter处理数据,最后遍历完后通过closeAndWriteOutput进行文件合并操作。

// 将map任务的输出结果写到磁盘
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {boolean success = false;try {// 迭代输入的每条记录while (records.hasNext()) { // 将记录插入排序器insertRecordIntoSorter(records.next());}// 将map任务输出的数据持久化到磁盘closeAndWriteOutput();success = true;} finally {// 写出完成后,需要使用ShuffleExternalSorter进行资源清理if (sorter != null) {try {sorter.cleanupResources();} catch (Exception e) {...}}}
}

插入数据

我们来看下insertRecordIntoSorter的具体实现:

  1. 首先获取key,根据partition函数得到分区ID;
  2. 重置序列化缓冲区,并且将记录的key以及value序列化操作,写入缓冲区中;
  3. 得到缓冲区的大小,这是key+value的连续空间的长度;
  4. 通过ShuffleExternalSorterserBuffer底层的序列化字节数组插入到Tungsten的内存中。
void insertRecordIntoSorter(Product2<K, V> record) throws IOException { final K key = record._1();// 计算记录的分区IDfinal int partitionId = partitioner.getPartition(key);// 重置serBufferserBuffer.reset();// 将记录写入到serOutputStream中进行序列化serOutputStream.writeKey(key, OBJECT_CLASS_TAG);serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);serOutputStream.flush();// 得到序列化后的数据大小final int serializedRecordSize = serBuffer.size();assert (serializedRecordSize > 0);// 将serBuffer底层的序列化字节数组插入到Tungsten的内存中sorter.insertRecord(serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);
}

合并最终结果

在遍历完迭代器数据后,会进行合并内存和已经落盘的文件,得到最终结果:

  1. 更新使用内存的峰值,供Spark UI界面显示使用;
  2. ShuffleInMemorySorter内存中的数据落盘,并获取到所有落盘文件;
  3. 合并所有文件为一个文件,具体的合并操作后面讲解;
  4. 生成数据文件和索引文件,返回MapStatus
void closeAndWriteOutput() throws IOException { // 更新使用内存的峰值updatePeakMemoryUsed();serBuffer = null;serOutputStream = null;// 关闭ShuffleExternalSorter,获得溢出文件信息的数组final SpillInfo[] spills = sorter.closeAndGetSpills();// 将ShuffleExternalSorter置为nullsorter = null;final long[] partitionLengths;// 获取正式的输出数据文件final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);// 创建临时文件final File tmp = Utils.tempFileWith(output);try {try {// 合并所有溢出文件到正式的输出数据文件,返回的是每个分区的数据长度partitionLengths = mergeSpills(spills, tmp);} finally {// 因为合并成了一个文件,因此删除溢写的文件for (SpillInfo spill : spills) {if (spill.file.exists() && ! spill.file.delete()) {logger.error("Error while deleting spill file {}", spill.file.getPath());}}}// 根据partitionLengths数组创建索引文件shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);} finally {if (tmp.exists() && !tmp.delete()) {logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());}}// 构造并返回MapStatus对象mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}

生成数据文件

mergeSpills是真正进行合并的入口,会根据不同情况进行不同的操作,有以下几个变量:

  1. encryptionEnabled:是否启用加密,默认为false,通过 spark.io.encryption.enabled 参数来设置;
  2. transferToEnabled:是否可以使用nio的transferTo传输,默认为true,通过spark.file.transferTo 参数来设置;
  3. compressionEnabled:是否使用压缩,默认为true,通过 spark.shuffle.compress参数来设置;
  4. compressionCodec:默认压缩类,默认为LZ4CompressionCodec,通过 spark.io.compression.codec 参数来设置;
  5. fastMergeEnabled:是否启用fast merge,默认为true,通过 spark.shuffle.unsafe.fastMergeEnabled参数来设置;
  6. fastMergeIsSupported:是否支持 fast merge,如果不使用压缩或者是压缩算法是以下几种:org.apache.spark.io.SnappyCompressionCodecorg.apache.spark.io.LZFCompressionCodecorg.apache.spark.io.LZ4CompressionCodecorg.apache.spark.io.ZStdCompressionCodec这四种支持连接的压缩算法中的一种都是可以使用 fast merge的。

执行步骤如下:

  1. 首先初始化以上几个变量;
  2. 如果溢出文件为0,直接返回全是0的分区数组;
  3. 如果溢出文件为1,文件重命名后返回只有一个元素的分区数组;
  4. 如果溢出文件多于1个则,多个溢出文件开始merge,merge分为三种方式:transfered-based fast mergefileStream-based fast merge以及slow merge三种方式,第一种是通过mergeSpillsWithTransferTo函数实现,后面两种是通过mergeSpillsWithFileStream实现,区别在于是否有压缩器。
    1. 使用transfered-based fast merge条件是使用 fast merge并且压缩算法支持fast merge,并且启用了nio的transferTo传输且不启用文件加密;
    2. 使用fileStream-based fast merge条件是使用 fast merge并且压缩算法支持fast merge,并且未启用nio的transferTo传输或启用了文件加密;
    3. 使用slow merge条件是未使用 fast merge或压缩算法不支持fast merge。
private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOException {/** 几个主要变量 **/// 是否开启了解压缩,由spark.shuffle.compress参数配置,默认为truefinal boolean compressionEnabled = sparkConf.getBoolean("spark.shuffle.compress", true);// 压缩编解码器final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf);// 是否开启了快速合并,由spark.shuffle.unsafe.fastMergeEnabled参数配置,默认为truefinal boolean fastMergeEnabled = sparkConf.getBoolean("spark.shuffle.unsafe.fastMergeEnabled", true);/*** 检查是否支持快速合并:* 1. 没有开启压缩,则可以快速合并;* 2. 开启了压缩,但需要压缩编解码器支持对级联序列化流进行解压缩。*    支持该功能的压缩编解码有Snappy、LZ4、LZF三种。*/final boolean fastMergeIsSupported = !compressionEnabled ||CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams(compressionCodec);// 是否开启了数据加解密final boolean encryptionEnabled = blockManager.serializerManager().encryptionEnabled();try {if (spills.length == 0) { // 没有溢写文件,创建空文件new FileOutputStream(outputFile).close(); // 返回分区总数大小的空的long数组return new long[partitioner.numPartitions()];} else if (spills.length == 1) { // 有一个溢写文件 // 直接将该文件重命名为outputFile文件Files.move(spills[0].file, outputFile);// 返回分区的数据长度数组return spills[0].partitionLengths;} else { // 有多个溢写文件// 用于存放每个分区对应的输出数据的长度的数组final long[] partitionLengths;if (fastMergeEnabled && fastMergeIsSupported) { // 开启并支持快速合并if (transferToEnabled && !encryptionEnabled) { // 开启了NIO复制方式,且未开启加解密 // 使用mergeSpillsWithTransferTo()方法的transferTo-based fast方式进行合并partitionLengths = mergeSpillsWithTransferTo(spills, outputFile);} else {// 使用mergeSpillsWithFileStream()方法的fileStream-based fast方式进行合并,压缩编解码器传null partitionLengths = mergeSpillsWithFileStream(spills, outputFile, null);}} else {// 使用mergeSpillsWithFileStream()方法的slow方式进行合并,传递了压缩编解码器 partitionLengths = mergeSpillsWithFileStream(spills, outputFile, compressionCodec);} // 记录度量信息writeMetrics.decBytesWritten(spills[spills.length - 1].file.length());writeMetrics.incBytesWritten(outputFile.length());return partitionLengths;}} catch (IOException e) {if (outputFile.exists() && !outputFile.delete()) {logger.error("Unable to delete output file {}", outputFile.getPath());}throw e;}
}

transfered-based fast merge

mergeSpillsWithTransferTo是基于Java NIO,通过channel直接传输数据,内部通过两层循环遍历每个文件的每个分区,将分区相同的数据合并到一起,要求数据不能是压缩的,或者支持级联压缩流的解压缩,步骤如下:

  1. 创建Reduce数目的数组,用来记录每个Reduce对应的数据长度;
  2. 创建溢写磁盘文件等大小的FileChannel数组,通过channel获取数据,后面会进行初始化,获取每个磁盘文件的fileChannel;
  3. 创建最终存储文件的FileChannel并初始化;
  4. 迭代每个分区,由于不需要聚合操作,只需要将每个磁盘文件中相应分区的数据写入到最终结果文件中,所以可以先获取长度,然后不断使用FileChanneltransferTo()方法将数据从溢写文件的FileChannel传输到合并文件的FileChannel
  5. 更新每个分区的长度信息;
  6. 最后关闭各个文件。
private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) throws IOException {// >>>> 1. reduce分区长度的数组创建final int numPartitions = partitioner.numPartitions();final long[] partitionLengths = new long[numPartitions];// >>>> 2. 创建FileChannel数组,用于存放每个溢写文件的FileChannel对象final FileChannel[] spillInputChannels = new FileChannel[spills.length];// 创建存放每个溢写文件的FileChannel的position的数组final long[] spillInputChannelPositions = new long[spills.length];// >>> 3. 合并输出文件的FileChannelFileChannel mergedFileOutputChannel = null;boolean threwException = true;try {// >>> 2.x 初始化channel,获取每个溢写文件的FileChannel,存放到spillInputChannels数组for (int i = 0; i < spills.length; i++) {spillInputChannels[i] = new FileInputStream(spills[i].file).getChannel();}// >>> 3.x 获取合并输出文件的FileChannelmergedFileOutputChannel = new FileOutputStream(outputFile, true).getChannel();// 传输总字节数long bytesWrittenToMergedFile = 0;// >> 4. 遍历分区,拿取该分区的所有数据,不需要合并,只需要拿取即可for (int partition = 0; partition < numPartitions; partition++) {// 遍历溢写文件的SpillInfo对象for (int i = 0; i < spills.length; i++) {// >>> 4.1 获取溢写文件信息SpillInfo对象中记录的对应分区(即partition分区)的数据长度final long partitionLengthInSpill = spills[i].partitionLengths[partition];// 需要TransferTo的字节数long bytesToTransfer = partitionLengthInSpill;// 获取对应溢写文件的FileChannelfinal FileChannel spillInputChannel = spillInputChannels[i];// 开始时间final long writeStartTime = System.nanoTime();// >>> 4.2 拿取数据,当还有需要TransferTo的数据while (bytesToTransfer > 0) {// 使用FileChannel的transferTo()方法将数据从溢写文件的FileChannel传输到合并文件的FileChannel,final long actualBytesTransferred = spillInputChannel.transferTo(spillInputChannelPositions[i],bytesToTransfer,mergedFileOutputChannel);// 更新该溢写文件的position记录spillInputChannelPositions[i] += actualBytesTransferred;// 更新剩余字节数记录bytesToTransfer -= actualBytesTransferred;}writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);// 一个溢写文件传输完成,更新传输字节计数bytesWrittenToMergedFile += partitionLengthInSpill;// 更新partition分区本次从溢写文件传输的字节数partitionLengths[partition] += partitionLengthInSpill;}} // 检查合并输出文件的position是否等于合并的总字节数if (mergedFileOutputChannel.position() != bytesWrittenToMergedFile) {throw ...}threwException = false;} finally {// >>> 5.检查每个溢写文件的FileChannel的position是否与文件的大小相同for (int i = 0; i < spills.length; i++) {// 检查每个溢写文件的FileChannel的position是否与文件的大小相同// 关闭溢写文件的FileChannelCloseables.close(spillInputChannels[i], threwException);}// 关闭合并文件的FileChannelCloseables.close(mergedFileOutputChannel, threwException);}// 返回存放了每个分区的数据长度的数组return partitionLengths;
}

fileStream-based fast merge

使用使用Java标准的流式IO来读取数据,并合并溢写文件,这种方式比NIO TransferTo的方式要慢。满足以下三种情况之一会使用该方式:需要支持加解密功能。压缩编解码器不支持对级联序列化流进行解压缩。当开发者自行禁用了transferTo的功能。步骤如下:

  1. 创建Reduce数目的数组,用来记录每个Reduce对应的数据长度;
  2. 创建溢写磁盘文件等大小的FileInputStream数组,获取磁盘文件的数据流;
  3. 创建最终存储文件的CountingOutputStream并初始化,带有计数器的OutPutStream;
  4. 迭代每个分区,由于不需要聚合操作,只需要将每个磁盘文件中相应分区的数据写入到最终结果文件中,首先要对输出流进行包装,加上写出时时间记录功能以及自动关闭功能,另外还需要加解密包装,如果需要解压缩,还得加压缩包装;
  5. 对于每个磁盘文件,获取分区对应的文件位置和长度,copy到输出流中,并更新最终分区的长度;
  6. 最后关闭各个文件。
private long[] mergeSpillsWithFileStream(SpillInfo[] spills,File outputFile, @Nullable CompressionCodec compressionCodec) throws IOException {// >>>> 1. 创建存储分区数据长度的数组final int numPartitions = partitioner.numPartitions();final long[] partitionLengths = new long[numPartitions];// >>> 2. 创建保存溢写文件输入流的数组final InputStream[] spillInputStreams = new FileInputStream[spills.length];// >>>> 3. 创建合并输出流,该流对FileOutputStream进行了包装,提供了字节计数功能final CountingOutputStream mergedFileOutputStream = new CountingOutputStream(new FileOutputStream(outputFile));boolean threwException = true;try {// >>> 2.x 遍历所有的溢写文件信息对象SpillInfo,为每个溢写文件创建FileInputStreamfor (int i = 0; i < spills.length; i++) {spillInputStreams[i] = new FileInputStream(spills[i].file);}// >>> 4. 遍历分区号for (int partition = 0; partition < numPartitions; partition++) {// 从mergedFileOutputStream中获取当前已合并的字节数final long initialFileLength = mergedFileOutputStream.getByteCount();// 再次进行包装,得到针对当前分区号的输出流// 1. TimeTrackingOutputStream的包装提供了写出时时间记录功能。// 2. CloseShieldOutputStream的包装了close()方法,屏蔽了对被包装流的关闭操作。OutputStream partitionOutput = new CloseShieldOutputStream(new TimeTrackingOutputStream(writeMetrics, mergedFileOutputStream));// 加解密包装partitionOutput = blockManager.serializerManager().wrapForEncryption(partitionOutput);// 解压缩包装if (compressionCodec != null) {partitionOutput = compressionCodec.compressedOutputStream(partitionOutput);}// 遍历溢写文件信息对象SpillInfo,由于每个溢写文件中包含了多个分区的数据,因此需要遍历每个溢写文件,// 并得到每个溢写文件中记录的对应分区的数据for (int i = 0; i < spills.length; i++) {// 获取每个溢写文件中记录的对应分区(即外层for循环中循环到的partition分区)的数据大小final long partitionLengthInSpill = spills[i].partitionLengths[partition];if (partitionLengthInSpill > 0) { // 分区溢写数据大于0// 将当前溢写文件的输入流,根据对应分区的数据大小包装为LimitedInputStream流InputStream partitionInputStream = new LimitedInputStream(spillInputStreams[i], partitionLengthInSpill, false);try {// 加解密包装partitionInputStream = blockManager.serializerManager().wrapForEncryption(partitionInputStream);// 解压缩包装if (compressionCodec != null) {partitionInputStream = compressionCodec.compressedInputStream(partitionInputStream);}// 将数据拷贝到partitionOutputByteStreams.copy(partitionInputStream, partitionOutput);} finally {// 关闭LimitedInputStream流,但不会关闭底层被包装的流partitionInputStream.close();}}}// 刷新数据partitionOutput.flush();partitionOutput.close();// 记录分区数据长度partitionLengths[partition] = (mergedFileOutputStream.getByteCount() - initialFileLength);}threwException = false;} finally {// 关闭溢写文件的输入流for (InputStream stream : spillInputStreams) {Closeables.close(stream, threwException);}// 关闭合并文件的输出流Closeables.close(mergedFileOutputStream, threwException);}// 返回存放了每个分区的数据长度的数组return partitionLengths;
}

slow merge

slow merge也是使用mergeSpillsWithFileStream方法进行合并,只不过会多传入一个压缩类来包装输入输出流。

更新索引

更新索引跟SortBasedWriter逻辑一样,生成索引和数据文件,构造MapStatus返回。

参考

  1. https://www.cnblogs.com/johnny666888/p/11291546.html
  2. https://www.jianshu.com/p/36780de5066f
  3. https://blog.csdn.net/zc19921215/article/details/86500383
  4. https://developer.aliyun.com/article/60184

Spark Shuffle源码分析系列之UnsafeShuffleWriter相关推荐

  1. Spark Shuffle源码分析系列之PartitionedPairBufferPartitionedAppendOnlyMap

    概述 SortShuffleWriter使用ExternalSorter进行ShuffleMapTask数据内存以及落盘操作,ExternalSorter中使用内存进行数据的缓存过程中根据是否需要ma ...

  2. Spark DAGScheduler源码分析系列之一: 基础

    DAGScheduler DAGScheduler是Spark中比较重要的类,实现了面向DAG的高层次调度,DAGScheduler通过计算将DAG中的一系列RDD划分到不同的Stage,然后构建这些 ...

  3. k8s源码分析 pdf_Spark Kubernetes 的源码分析系列 - features

    1 Overview features 包里的代码,主要是用于构建 Spark 在 K8S 中的各类资源所需要的特征,个人觉得可以理解成这些 features 就是帮你写各类 Kind 的 YAML ...

  4. Kylin源码分析系列三—rowKey编码

    Kylin源码分析系列三-rowKey编码 注:Kylin源码分析系列基于Kylin的2.5.0版本的源码,其他版本可以类比. 1. 相关概念 前面介绍了Kylin中Cube构建的流程,但Cube数据 ...

  5. jQuery源码分析系列

    声明:本文为原创文章,如需转载,请注明来源并保留原文链接Aaron,谢谢! 版本截止到2013.8.24 jQuery官方发布最新的的2.0.3为准 附上每一章的源码注释分析 :https://git ...

  6. MyBatis 源码分析系列文章合集

    1.简介 我从七月份开始阅读MyBatis源码,并在随后的40天内陆续更新了7篇文章.起初,我只是打算通过博客的形式进行分享.但在写作的过程中,发现要分析的代码太多,以至于文章篇幅特别大.在这7篇文章 ...

  7. MyBatis 源码分析系列文章导读

    1.本文速览 本篇文章是我为接下来的 MyBatis 源码分析系列文章写的一个导读文章.本篇文章从 MyBatis 是什么(what),为什么要使用(why),以及如何使用(how)等三个角度进行了说 ...

  8. Spring IOC 容器源码分析系列文章导读

    1. 简介 前一段时间,我学习了 Spring IOC 容器方面的源码,并写了数篇文章对此进行讲解.在写完 Spring IOC 容器源码分析系列文章中的最后一篇后,没敢懈怠,趁热打铁,花了3天时间阅 ...

  9. Spring IOC 容器源码分析系列文章导读 1

    1. 简介 Spring 是一个轻量级的企业级应用开发框架,于 2004 年由 Rod Johnson 发布了 1.0 版本.经过十几年的迭代,现在的 Spring 框架已经非常成熟了.Spring ...

最新文章

  1. android 搜索框 github,Github上Android简单好用的提示框推荐 | 夕辞
  2. 云计算之路:数据库迁移方案
  3. 成功解决building ‘snappy._snappy‘ extension error: Microsoft Visual C++ 14.0 is required. Get it with “B
  4. 引用js_js值和引用
  5. nginx访问量统计
  6. JavaScript高级程序设计(第3版)中文在线阅读,也可以免费下载~
  7. 使用Python对比两个excel表格中的重复数据
  8. Android material design 之 BottomSheet基础入门
  9. 浙商证券显示计算机丢失,无法启动此程序,计算机丢失d3dx-942.dll,这个怎么解决啊?...
  10. winvnc源码阅读笔记(四)---------vncDesktopThread线程
  11. 快速入门Linxu笔记
  12. 详解数据仓库建设体系
  13. 从键盘读入一个字符,如果该字符是大写字母则转小写,如果该字符是小写字母则转大写,如果不是字符则输出不是字母。
  14. PHP的isset和empty的区别
  15. 【IBM Tivoli Identity Manager 学习文档】9 怎样自定义ITIM的界面
  16. 基于ArrayList实现HashMap代码
  17. LoRa点对点系统3 与PC接口
  18. RK3308 Linux UAC与ADB 复合同时使用
  19. Oracle_如何应对润秒
  20. NeurIPS 2019:17篇论文,详解图的机器学习趋势

热门文章

  1. 装饰模式:换装系统设计
  2. qc35 说明书_教你Bose QC35耳机的使用方法
  3. 蚂蚁金服分布式中间件开源第三弹: 下一代微服务SOFAMesh
  4. 3款Android版epub阅读器推荐
  5. Flutter,SharedPreferences的同步处理,如Android原生般的
  6. 配置nginx+mongrel的rails部署环境
  7. 虚拟机服务器的IP无法连接,虚拟机远程服务器连接ip
  8. CentOs解决下载速度慢 更换下载源
  9. 超级全能CPU+GPU异构超算平台
  10. 2009年7月22日全食 日全食观测方法 发生原理