在上一篇《深入理解Spark 2.1 Core (九):迭代计算和Shuffle的原理与源码分析》提到经过迭代计算后, SortShuffleWriter.write中:

    // 根据排序方式,对数据进行排序并写入内存缓冲区。// 若排序中计算结果超出的阈值,// 则将其溢写到磁盘数据文件sorter.insertAll(records)
  • 1

我们先来宏观的了解下Map端,我们会根据aggregator.isDefined是否定义了聚合函数和ordering.isDefined是否定义了排序函数分为三种:

  • 没有聚合和排序,数据先按照partition写入不同的文件中,最后按partition顺序合并写入同一文件 。适合partition数量较少时。将多个bucket合并到同一文件,减少map输出文件数,节省磁盘I/O,提高性能。
  • 没有聚合但有排序,在缓存对数据先根据分区(或者还有key)进行排序,最后按partition顺序合并写入同一文件。适合当partition数量较多时。将多个bucket合并到同一文件,减少map输出文件数,节省磁盘I/O,提高性能。缓存使用超过阈值,将数据写入磁盘。
  • 有聚合有排序,现在缓存中根据key值聚合,再在缓存对数据先根据分区(或者还有key)进行排序,最后按partition顺序合并写入同一文件。将多个bucket合并到同一文件,减少map输出文件数,节省磁盘I/O,提高性能。缓存使用超过阈值,将数据写入磁盘。逐条的读取数据,并进行聚合,减少了内存的占用。

我们先来深入看下insertAll

  def insertAll(records: Iterator[Product2[K, V]]): Unit = {// 若定义了聚合函数,则shouldCombine为trueval shouldCombine = aggregator.isDefined// 外部排序是否需要聚合if (shouldCombine) {      // mergeValue 是 对 Value 进行 merge的函数val mergeValue = aggregator.get.mergeValue// createCombiner 为生成 Combiner 的 函数val createCombiner = aggregator.get.createCombinervar kv: Product2[K, V] = null// update 为偏函数val update = (hadValue: Boolean, oldValue: C) => {// 当有Value时,将oldValue与新的Value kv._2 进行merge// 若没有Value,传入kv._2,生成Valueif (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)}while (records.hasNext) {addElementsRead()kv = records.next()// 首先使用我们的AppendOnlyMap// 在内存中对value进行聚合 map.changeValue((getPartition(kv._1), kv._1), update)// 超过阈值时写入磁盘maybeSpillCollection(usingMap = true)}} else {// 直接把Value插入缓冲区while (records.hasNext) {addElementsRead()val kv = records.next()buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])maybeSpillCollection(usingMap = false)}}}

这里的createCombiner我们可以看做用kv._2生成一个Value。而mergeValue我们可以理解成为MapReduce中的combiner,即可以理解为Map端的Reduce操作,先对相同的keyValue进行聚合。

聚合算法

下面我们来深入看看聚合操作部分:

调用栈:

  • util.collection.SizeTrackingAppendOnlyMap.changeValue

    • util.collection.AppendOnlyMap.changeValue

      • util.collection.AppendOnlyMap.incrementSize

        • util.collection.AppendOnlyMap.growTable
    • util.collection.SizeTracker.afterUpdate
      • util.collection.SizeTracker.takeSample

首先是AppendOnlyMapchangeValue函数:

util.collection.SizeTrackingAppendOnlyMap.changeValue

  override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {// 应用聚合算法得到newValueval newValue = super.changeValue(key, updateFunc)// 更新对 AppendOnlyMap 大小的采样super.afterUpdate()// 返回结果newValue}

util.collection.AppendOnlyMap.changeValue

聚合算法:

  def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {assert(!destroyed, destructionMessage)val k = key.asInstanceOf[AnyRef]if (k.eq(null)) {if (!haveNullValue) {incrementSize()}nullValue = updateFunc(haveNullValue, nullValue)haveNullValue = truereturn nullValue}// 根据k的hashCode在哈希 与 上 掩码 得到 pos// 2*pos 为 k 应该所在的位置// 2*pos + 1 为 k 对应的 v 所在的位置var pos = rehash(k.hashCode) & maskvar i = 1while (true) {// 得到data中k所在的位置上的值curKeyval curKey = data(2 * pos)if (curKey.eq(null)) {// 若curKey为空// 得到根据 kv._2,即单个新值 生成的 newValueval newValue = updateFunc(false, null.asInstanceOf[V])data(2 * pos) = kdata(2 * pos + 1) = newValue.asInstanceOf[AnyRef]// 扩充容量incrementSize()return newValue} else if (k.eq(curKey) || k.equals(curKey)) {// 若k 与 curKey 相等// 将oldValue(data(2 * pos + 1)) 和 新的Value(kv._2) 进行聚合val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]return newValue} else {// 若curKey 不为null,也和k不想等,// 即 hash 冲突// 则 不断的向后遍历 直到出现前两种情况val delta = ipos = (pos + delta) & maski += 1}}null.asInstanceOf[V] }
  • 1

util.collection.AppendOnlyMap.incrementSize

我们再来看一下扩充容量的实现:

  private def incrementSize() {curSize += 1// 当curSize大于阈值growThreshold时,// 调用growTable()if (curSize > growThreshold) {growTable()}}
  • 1

util.collection.AppendOnlyMap.growTable

  protected def growTable() {生成容量翻倍的newDataval newCapacity = capacity * 2require(newCapacity <= MAXIMUM_CAPACITY, s"Can't contain more than ${growThreshold} elements")val newData = new Array[AnyRef](2 * newCapacity)// 生成newMaskval newMask = newCapacity - 1var oldPos = 0while (oldPos < capacity) {// 将旧的Data 中的数据用newMask重新计算位置,// 复制到新的Data 中 if (!data(2 * oldPos).eq(null)) {val key = data(2 * oldPos)val value = data(2 * oldPos + 1)var newPos = rehash(key.hashCode) & newMaskvar i = 1var keepGoing = truewhile (keepGoing) {val curKey = newData(2 * newPos)if (curKey.eq(null)) {newData(2 * newPos) = keynewData(2 * newPos + 1) = valuekeepGoing = false} else {val delta = inewPos = (newPos + delta) & newMaski += 1}}}oldPos += 1}// 更新data = newDatacapacity = newCapacitymask = newMaskgrowThreshold = (LOAD_FACTOR * newCapacity).toInt}
  • 1

util.collection.SizeTracker.afterUpdate

我们回过头来看SizeTrackingAppendOnlyMap.changeValue中的更新对AppendOnlyMap大小的采样super.afterUpdate()。所谓大小的采样,是只一次UpdateAppendOnlyMap大小的变化量。但是如果在每次如insert``update等操作后就进行计算一次AppendOnlyMap会大大降低性能。所以,这里采用了采样估计的方法:

  protected def afterUpdate(): Unit = {numUpdates += 1// 若numUpdates到达阈值,// 则进行采样if (nextSampleNum == numUpdates) {takeSample()}}
  • 1

util.collection.SizeTracker.takeSample

  private def takeSample(): Unit = {samples.enqueue(Sample(SizeEstimator.estimate(this), numUpdates))// 只用两个采样if (samples.size > 2) {samples.dequeue()}val bytesDelta = samples.toList.reverse match {// 估计出每次更新的变化量case latest :: previous :: tail =>(latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)// 若小于 2个 样本, 假设没产生变化case _ => 0}// 更新bytesPerUpdate = math.max(0, bytesDelta)// 增大阈值nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong}
  • 1
  • 10

我们再看来下估计AppendOnlyMap大小的函数:

  def estimateSize(): Long = {assert(samples.nonEmpty)// 计算估计的总变化量val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)// 之前的大小 加上 估计的总变化量(samples.last.size + extrapolatedDelta).toLong}

写缓冲区

现在我们回到insertAll,深入看看如何直接把Value插入缓冲区。

调用栈:

  • util.collection.PartitionedPairBuffer.insert

    • util.collection.PartitionedPairBuffer.growArray

util.collection.PartitionedPairBuffer.insert

  def insert(partition: Int, key: K, value: V): Unit = {// 到了容量大小,调用growArray()if (curSize == capacity) {growArray()}data(2 * curSize) = (partition, key.asInstanceOf[AnyRef])data(2 * curSize + 1) = value.asInstanceOf[AnyRef]curSize += 1afterUpdate()}
  • 1

util.collection.PartitionedPairBuffer.growArray

  private def growArray(): Unit = {if (capacity >= MAXIMUM_CAPACITY) {throw new IllegalStateException(s"Can't insert more than ${MAXIMUM_CAPACITY} elements")}val newCapacity =if (capacity * 2 < 0 || capacity * 2 > MAXIMUM_CAPACITY) { // OverflowMAXIMUM_CAPACITY} else {capacity * 2}// 生成翻倍容量的newArrayval newArray = new Array[AnyRef](2 * newCapacity)// 复制System.arraycopy(data, 0, newArray, 0, 2 * capacity)data = newArraycapacity = newCapacityresetSamples()}
  • 8

溢出

现在我们回到insertAll,深入看看如何将超过阈值时写入磁盘:

调用栈:

  • util.collection.ExternalSorter.maybeSpillCollection

    • util.collection.Spillable.maybeSpill

      • util.collection.Spillable.spill

        • util.collection.ExternalSorter.spillMemoryIteratorToDisk

util.collection.ExternalSorter.maybeSpillCollection

  private def maybeSpillCollection(usingMap: Boolean): Unit = {var estimatedSize = 0Lif (usingMap) {estimatedSize = map.estimateSize()if (maybeSpill(map, estimatedSize)) {map = new PartitionedAppendOnlyMap[K, C]}} else {estimatedSize = buffer.estimateSize()if (maybeSpill(buffer, estimatedSize)) {buffer = new PartitionedPairBuffer[K, C]}}if (estimatedSize > _peakMemoryUsedBytes) {_peakMemoryUsedBytes = estimatedSize}}
  • 5

util.collection.Spillable.maybeSpill

  protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {var shouldSpill = falseif (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {// 若大于阈值// amountToRequest 为要申请的内存空间val amountToRequest = 2 * currentMemory - myMemoryThresholdval granted = acquireMemory(amountToRequest)myMemoryThreshold += granted// 若果我们分配了太小的内存,// 由于 tryToAcquire 返回0// 或者 内存申请大小超过了myMemoryThreshold// 导致 依然 currentMemory >= myMemoryThreshold// 则 shouldSpillshouldSpill = currentMemory >= myMemoryThreshold}// 若元素读取数大于阈值// 则 shouldSpillshouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThresholdif (shouldSpill) {// 跟新 Spill 次数_spillCount += 1logSpillage(currentMemory)// Spill操作spill(collection)// 元素读取数 清零_elementsRead = 0// 增加Spill的内存计数// 释放内存_memoryBytesSpilled += currentMemoryreleaseMemory()}shouldSpill}
  • 1

util.collection.Spillable.spill

将内存中的集合spill到一个有序文件中。之后SortShuffleWriter.write中会调用sorter.writePartitionedFilemerge它们

  override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {// 生成内存中集合的迭代器,// 这部分我们之后会深入讲解val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)// 生成spill文件,// 并将其加入数组val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)spills += spillFile}
  • 1

util.collection.ExternalSorter.spillMemoryIteratorToDisk

  private[this] def spillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator): SpilledFile = {
// 生成临时文件 及 blockId val (blockId, file) = diskBlockManager.createTempShuffleBlock()// 这些值在每次flush后会被重置var objectsWritten: Long = 0val spillMetrics: ShuffleWriteMetrics = new ShuffleWriteMetricsval writer: DiskBlockObjectWriter =blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics)// 按写入磁盘的顺序记录分支的大小val batchSizes = new ArrayBuffer[Long]// 记录每个分区有多少元素val elementsPerPartition = new Array[Long](numPartitions)// Flush  writer 内容到磁盘,// 并更新相关变量def flush(): Unit = {val segment = writer.commitAndGet()batchSizes += segment.length_diskBytesSpilled += segment.lengthobjectsWritten = 0}var success = falsetry {// 遍历内存集合while (inMemoryIterator.hasNext) {val partitionId = inMemoryIterator.nextPartition()require(partitionId >= 0 && partitionId < numPartitions,s"partition Id: ${partitionId} should be in the range [0, ${numPartitions})")inMemoryIterator.writeNext(writer)elementsPerPartition(partitionId) += 1objectsWritten += 1// 当写入的元素个数 到达 批量序列化尺寸,// flushif (objectsWritten == serializerBatchSize) {flush()}}if (objectsWritten > 0) {// 遍历结束后还有写入// flushflush()} else {writer.revertPartialWritesAndClose()}success = true} finally {if (success) {writer.close()} else {writer.revertPartialWritesAndClose()if (file.exists()) {if (!file.delete()) {logWarning(s"Error deleting ${file}")}}}}SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)}
  • 1

排序

我们再在回到,SortShuffleWriter.write中:

      // 在外部排序中,// 有部分结果可能在内存中// 另外部分结果在一个或多个文件中// 需要将它们merge成一个大文件val partitionLengths = sorter.writePartitionedFile(blockId, tmp)

调用栈:

  • util.collection.writePartitionedFile

    • util.collection.ExternalSorter.destructiveSortedWritablePartitionedIterator
    • util.collection.ExternalSorter.partitionedIterator
      • partitionedDestructiveSortedIterator

util.collection.ExternalSorter.writePartitionedFile

我们先来深入看下writePartitionedFile,将数据加入这个ExternalSorter中,写入一个磁盘文件:

  def writePartitionedFile(blockId: BlockId,outputFile: File): Array[Long] = {// 跟踪输出文件的位置val lengths = new Array[Long](numPartitions)val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,context.taskMetrics().shuffleWriteMetrics)if (spills.isEmpty) {// 当只有内存中有数据时val collection = if (aggregator.isDefined) map else bufferval it = collection.destructiveSortedWritablePartitionedIterator(comparator)while (it.hasNext) {val partitionId = it.nextPartition()while (it.hasNext && it.nextPartition() == partitionId) {it.writeNext(writer)}val segment = writer.commitAndGet()lengths(partitionId) = segment.length}} else {// 否则必须进行merge-sort// 得到一个分区迭代器// 并且直接把所有数据写入for ((id, elements) <- this.partitionedIterator) {if (elements.hasNext) {for (elem <- elements) {writer.write(elem._1, elem._2)}val segment = writer.commitAndGet()lengths(id) = segment.length}}}writer.close()context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)lengths}
  • 1
  • 28

util.collection.ExternalSorter.destructiveSortedWritablePartitionedIterator

writePartitionedFile使用destructiveSortedWritablePartitionedIterator生成了迭代器:

val it = collection.destructiveSortedWritablePartitionedIterator(comparator)

在上篇博文中提到util.collection.Spillable.spill中也使用到了它:

val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
  • 1

我们来看下destructiveSortedWritablePartitionedIterator

  def destructiveSortedWritablePartitionedIterator(keyComparator: Option[Comparator[K]]): WritablePartitionedIterator = {// 生成迭代器val it = partitionedDestructiveSortedIterator(keyComparator)new WritablePartitionedIterator {private[this] var cur = if (it.hasNext) it.next() else nulldef writeNext(writer: DiskBlockObjectWriter): Unit = {writer.write(cur._1._2, cur._2)cur = if (it.hasNext) it.next() else null}def hasNext(): Boolean = cur != nulldef nextPartition(): Int = cur._1._1}}
  • 1

可以看到WritablePartitionedIterator相当于partitionedDestructiveSortedIterator所返回的迭代器的代理类。destructiveSortedWritablePartitionedIterator并不返回值,而是将DiskBlockObjectWriter传入,再进行写。我们先把partitionedDestructiveSortedIterator放一下,往下看。

util.collection.ExternalSorter.partitionedIterator

和另外一个分支不同,这个分支是调用partitionedIterator得到分区迭代器,并且直接把所有数据写入。我们来深入看看partitionedIterator

  def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {val usingMap = aggregator.isDefinedval collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else bufferif (spills.isEmpty) {// 当没有spills// 按我们之前的流程 不会 加入这分支if (!ordering.isDefined) {// 若不需要对key排序// 则只对Partition进行排序groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))} else {// 否则需要对partition和key 进行排序groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(Some(keyComparator))))}} else {//  当有spills// 需要 Merge spilled出来的那些临时文件 和 内存中的 数据merge(spills, destructiveIterator(collection.partitionedDestructiveSortedIterator(comparator)))}}
  • 1

我们先来看下spills.isEmpty时候,两种排序方式:

  • 只对Partition进行排序:
    partitionedDestructiveSortedIterator中传入的是None,意思是不对key进行排序。对Partition进行排序是默认会在partitionedDestructiveSortedIterator中进行的。我们留在后面讲解。
groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
  • 1

Partition排序后,根据Partition的聚合:

  private def groupByPartition(data: Iterator[((Int, K), C)]): Iterator[(Int, Iterator[Product2[K, C]])] ={val buffered = data.buffered(0 until numPartitions).iterator.map(p => (p, new IteratorForPartition(p, buffered)))}
  • 1
  • 2

IteratorForPartition就是对单个partion的迭代器:

  private[this] class IteratorForPartition(partitionId: Int, data: BufferedIterator[((Int, K), C)])extends Iterator[Product2[K, C]]{override def hasNext: Boolean = data.hasNext && data.head._1._1 == partitionIdoverride def next(): Product2[K, C] = {if (!hasNext) {throw new NoSuchElementException}val elem = data.next()(elem._1._2, elem._2)}}
  • 对partition和key进行排序
groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
  • 1

partitionedDestructiveSortedIterator中传入的是keyComparator

  private val keyComparator: Comparator[K] = ordering.getOrElse(new Comparator[K] {override def compare(a: K, b: K): Int = {val h1 = if (a == null) 0 else a.hashCode()val h2 = if (b == null) 0 else b.hashCode()if (h1 < h2) -1 else if (h1 == h2) 0 else 1}})

先根据key的hashCode进行排序,再调用groupByPartitionpartition进行排序。

而对于有spills时,我们使用comparator

  private def comparator: Option[Comparator[K]] = {// 若需要排序 或者 需要 聚合if (ordering.isDefined || aggregator.isDefined) {Some(keyComparator)} else {None}}
  • 1

partitionedDestructiveSortedIterator

好了接下来我们就来看看partitionedDestructiveSortedIteratorpartitionedDestructiveSortedIterator是特质WritablePartitionedPairCollection中的方法。WritablePartitionedPairCollectionPartitionedAppendOnlyMapPartitionedPairBuffer继承。在partitionedIterator中可以看到:

    val usingMap = aggregator.isDefinedval collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer
  • 1

若需要聚合,则使用PartitionedAppendOnlyMap,否则使用PartitionedPairBuffer

util.collection.PartitionedPairBuffer.partitionedDestructiveSortedIterator

我们先来看下简单点的PartitionedPairBuffer.partitionedDestructiveSortedIterator

  override def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]]): Iterator[((Int, K), V)] = {val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)// 对数据进行排序new Sorter(new KVArraySortDataFormat[(Int, K), AnyRef]).sort(data, 0, curSize, comparator)iterator}
  • 1

我们可以看到上述:

 val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)
  • 1

使用partitionKeyComparator将原来的comparator给替换了。partitionKeyComparator就是partition和key二次排序,如果传入的keyComparatorNone,那就是只对Partition进行排序:

  def partitionKeyComparator[K](keyComparator: Comparator[K]): Comparator[(Int, K)] = {new Comparator[(Int, K)] {override def compare(a: (Int, K), b: (Int, K)): Int = {val partitionDiff = a._1 - b._1if (partitionDiff != 0) {partitionDiff} else {keyComparator.compare(a._2, b._2)}}}
  • 1
  • 2

之后我们使用Sort等对数据进行排序,其中用到了TimSort,在以后博文中,我们会深入讲解。

最后返回迭代器iterator,其实就是简单的按一对一对的去遍历数据:

  private def iterator(): Iterator[((Int, K), V)] = new Iterator[((Int, K), V)] {var pos = 0override def hasNext: Boolean = pos < curSizeoverride def next(): ((Int, K), V) = {if (!hasNext) {throw new NoSuchElementException}val pair = (data(2 * pos).asInstanceOf[(Int, K)], data(2 * pos + 1).asInstanceOf[V])pos += 1pair}}
}
  • 1
  • 6

util.collection.PartitionedAppendOnlyMap.partitionedDestructiveSortedIterator

  def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]]): Iterator[((Int, K), V)] = {val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)destructiveSortedIterator(comparator)}
  • 1

util.collection.PartitionedAppendOnlyMap.destructiveSortedIterator

  def destructiveSortedIterator(keyComparator: Comparator[K]): Iterator[(K, V)] = {destroyed = true// 向左整理var keyIndex, newIndex = 0while (keyIndex < capacity) {if (data(2 * keyIndex) != null) {data(2 * newIndex) = data(2 * keyIndex)data(2 * newIndex + 1) = data(2 * keyIndex + 1)newIndex += 1}keyIndex += 1}assert(curSize == newIndex + (if (haveNullValue) 1 else 0))new Sorter(new KVArraySortDataFormat[K, AnyRef]).sort(data, 0, newIndex, keyComparator)// 返回新的 Iteratornew Iterator[(K, V)] {var i = 0var nullValueReady = haveNullValuedef hasNext: Boolean = (i < newIndex || nullValueReady)def next(): (K, V) = {if (nullValueReady) {nullValueReady = false(null.asInstanceOf[K], nullValue)} else {val item = (data(2 * i).asInstanceOf[K], data(2 * i + 1).asInstanceOf[V])i += 1item}}}}

深入理解Spark 2.1 Core (十):Shuffle Map 端的原理与源码分析相关推荐

  1. 深入理解Spark 2.1 Core (十一):Shuffle Reduce 端的原理与源码分析

    我们曾经在<深入理解Spark 2.1 Core (一):RDD的原理与源码分析 >讲解过: 为了有效地实现容错,RDD提供了一种高度受限的共享内存,即RDD是只读的,并且只能通过其他RD ...

  2. 深入理解Spark 2.1 Core (十二):TimSort 的原理与源码分析

    在博文<深入理解Spark 2.1 Core (十):Shuffle Map 端的原理与源码分析 >中我们提到了: 使用Sort等对数据进行排序,其中用到了TimSort 这篇博文我们就来 ...

  3. 深入理解Spark 2.1 Core (七):Standalone模式任务执行的原理与源码分析

    这篇博文,我们就来讲讲Executor启动后,是如何在Executor上执行Task的,以及其后续处理. 执行Task 我们在<深入理解Spark 2.1 Core (三):任务调度器的原理与源 ...

  4. 深入理解Spark 2.1 Core (六):Standalone模式运行的原理与源码分析

    我们讲到了如何启动Master和Worker,还讲到了如何回收资源.但是,我们没有将AppClient是如何启动的,其实它们的启动也涉及到了资源是如何调度的.这篇博文,我们就来讲一下AppClient ...

  5. 深入理解Spark 2.1 Core (八):Standalone模式容错及HA的原理与源码分析

    第五.第六.第七篇博文,我们讲解了Standalone模式集群是如何启动的,一个App起来了后,集群是如何分配资源,Worker启动Executor的,Task来是如何执行它,执行得到的结果如何处理, ...

  6. 深入理解Spark 2.1 Core (五):Standalone模式运行的原理与源码分析

    概述 前几篇博文都在介绍Spark的调度,这篇博文我们从更加宏观的调度看Spark,讲讲Spark的部署模式.Spark部署模式分以下几种: local 模式 local-cluster 模式 Sta ...

  7. 深入理解Spark 2.1 Core (二):DAG调度器的原理与源码分析

    概述 上一篇<深入理解Spark(一):RDD实现及源码分析 >提到: 定义RDD之后,程序员就可以在动作(注:即action操作)中使用RDD了.动作是向应用程序返回值,或向存储系统导出 ...

  8. 深入理解GO语言:GC原理及源码分析

    Go 中的runtime 类似 Java的虚拟机,它负责管理包括内存分配.垃圾回收.栈处理.goroutine.channel.切片(slice).map 和反射(reflection)等.Go 的可 ...

  9. Spark 随机森林算法原理、源码分析及案例实战

    图 1. Spark 与其它大数据处理工具的活跃程度比较 回页首 环境要求 操作系统:Linux,本文采用的 Ubuntu 10.04,大家可以根据自己的喜好使用自己擅长的 Linux 发行版 Jav ...

最新文章

  1. Flutter 拨打电话和跳转网页
  2. 全球著名音乐抓轨软件EAC 设置详解
  3. SpringMVC对Ajax请求的处理
  4. cent os mysql 内存_Cent OS – MySQL – 主从配置
  5. python点操作符语法,Python 语法之操作符和表达式
  6. ANDROID_SDK_HOME的设置
  7. 微信小程序 短信验证 功能的实现(附案例代码/前后端/直接用)
  8. 12.PS-渐变工具组
  9. 从删库到跑路,论运维的自我修养
  10. 两台无线路由桥接教程
  11. 网吧计算机配置特点,揭秘:网吧电脑配置很低,却怎么用也不卡顿,这是为什么呢?...
  12. 官方消息:即将开始退钱
  13. 已有一个已排好序的数组,要求输入一个数,将它插入数组中,保持数组依然有序。
  14. 纯CSS边框渐变动画
  15. Node.js:npm install时出错 check python checking for Python executable “python2“ in the PATH
  16. springboot+服装销售管理系统的设计与实现 毕业设计-附源码221801
  17. Globeimposter-Alpha865qqz勒索病毒数据恢复|金蝶、用友、管家婆、OA、速达、ERP等软件数据库恢复
  18. 凡尔赛文学介绍计算机专业,凡尔赛文学什么梗?凡尔赛文学意思出处介绍
  19. 操作系统-先到先服务和短时间优先算法-C语言
  20. 原创 | 微信小游戏“跳一跳”改分攻略!

热门文章

  1. 微信小程序开发与应用 第一章 微信小程序的基本知识1
  2. 模拟k8s项目的生命周期
  3. 多维列表索引_10分钟带你学会Pandas多层级索引
  4. python meshgrid_torch.meshgrid()和np.meshgrid()的区别
  5. 国外学校css profile,CSS Print Profile
  6. pythonargmaxaxis1_keras.argmax中axis = -1的含义是什么?
  7. 企业系统门户需要哪些模块_灵活用工平台SAAS系统有哪些功能模块
  8. java threadstatus_Thread之一:线程生命周期及六种状态
  9. qos 流控功能_怎么设置飞鱼星QoS流量控制中的传统流控
  10. 大连开发区中老年运动微信群_消暑!大连近郊最受欢迎的海滨浴场集合来啦