深入Spark "Locality level"(Spark-2.3.0版本)

1、可以在Spark job ui上查看到

2、Locality level解释

说明:为了保证没有理解的偏差,将把英文原文说明粘贴如下。

Data locality can have a major impact on the performance of Spark jobs. If data and the code that operates on it are together then computation tends to be fast. But if code and data are separated, one must move to the other. Typically it is faster to ship serialized code from place to place than a chunk of data because code size is much smaller than data. Spark builds its scheduling around this general principle of data locality.

Data locality is how close data is to the code processing it. There are several levels of locality based on the data’s current location. In order from closest to farthest:

  • PROCESS_LOCAL data is in the same JVM as the running code. This is the best locality possible
  • NODE_LOCAL data is on the same node. Examples might be in HDFS on the same node, or in another executor on the same node. This is a little slower than PROCESS_LOCAL because the data has to travel between processes
  • NO_PREF data is accessed equally quickly from anywhere and has no locality preference
  • RACK_LOCAL data is on the same rack of servers. Data is on a different server on the same rack so needs to be sent over the network, typically through a single switch
  • ANY data is elsewhere on the network and not in the same rack

Spark prefers to schedule all tasks at the best locality level, but this is not always possible. In situations where there is no unprocessed data on any idle executor, Spark switches to lower locality levels. There are two options: a) wait until a busy CPU frees up to start a task on data on the same server, or b) immediately start a new task in a farther away place that requires moving data there.

What Spark typically does is wait a bit in the hopes that a busy CPU frees up. Once that timeout expires, it starts moving the data from far away to the free CPU. The wait timeout for fallback between each level can be configured individually or all together in one parameter; see thespark.locality parameters on the configuration page for details. You should increase these settings if your tasks are long and see poor locality, but the default usually works well.

数据本地性的优和差排序: PROCESS_LOCAL > NODE_LOCAL > NO_PREF > RACK_LOCAL

源码位置:

package org.apache.spark.schedulerimport org.apache.spark.annotation.DeveloperApi

@DeveloperApi
object TaskLocality extends Enumeration {// Process local is expected to be used ONLY within TaskSetManager for now.
  val PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY = Valuetype TaskLocality = Value

  def isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = {condition <= constraint}
}

3、Spark 任务调度中涉及的源码实现(与Locality level相关)

(1)源码类

  • org.apache.spark.rdd.RDD
  • org.apache.spark.scheduler.DAGScheduler
  • org.apache.spark.scheduler.TaskSetManager
  • org.apache.spark.scheduler.TaskSet
  • org.apache.spark.scheduler.TaskSchedulerImpl
  • .org.apache.spark.scheduler.TaskLocation

(2)关键方法

  • dequeueTask
  • computeValidLocalityLevels
  • hasExecutorsAliveOnHost
  • getPreferredLocations
  • dequeSpecultiveTask

如果任务比较多、而且执行的Locality level不太好,则可以适当提高这个参数值

spark.locality.wait (默认3s)

该参数的官方说明:

How long to wait to launch a data-local task before giving up and launching it on a less-local node. The same wait will be used to step through multiple locality levels (process-local, node-local, rack-local and then any). It is also possible to customize the waiting time for each level by setting spark.locality.wait.node, etc. You should increase this setting if your tasks are long and see poor locality, but the default usually works well.

4、Spark DAG任务调度获取最佳locality Level的过程

(1)RDD 的 PreferredLocations

通过RDD的getPreferredLocations方法可以返回partition的最优先位置:

/**
 * Optionally overridden by subclasses to specify placement preferences.
 */
protected def getPreferredLocations(split: Partition): Seq[String] = Nil

1)返回的类型是 Seq[String],其实际对应的是 Seq[TaskLocation]

2)在返回前都会执行 TaskLocation#toString 方法。

3)TaskLocation 是一个 trait,共有以三种实现,分别代表数据cache在不同的位置:

/**
 * 1、代表数据存储在 executor 的内存中,也就是这个 partition cache到内存了
 */
private [spark]
case class ExecutorCacheTaskLocation(override val host: String, executorId: String)extends TaskLocation {override def toString: String = s"${TaskLocation.executorLocationTag}${host}_$executorId"
}/**
 * 2、代表数据cache host 这个节点的磁盘上
 */
private [spark] case class HostTaskLocation(override val host: String) extends TaskLocation {override def toString: String = host
}/**
 * 3、代表数据cache Hadoop Distributed File System  */
private [spark] case class HDFSCacheTaskLocation(override val host: String) extends TaskLocation {override def toString: String = TaskLocation.inMemoryLocationTag + host
}
  • ExecutorCacheTaskLocation: 代表partition数据已经被cache到内存,比如KafkaRDD会将partitions都cache到内存,其toString方法返回的格式如executor_$host_$executorId.
  • HostTaskLocation:代表partition数据存储在某个节点的磁盘上(且不在hdfs上),其toString方法直接返回host.
  • HDFSCacheTaskLocation:代表partition数据存储在hdfs上,比如从hdfs上加载而来的 HadoopRDD 的 partition,其toString方法返回的格式如 hdfs_cache_$host.

说明:

RDD会有不同的最优先位置,那么Task运行的位置也不同,而且返回的字符串格式也是不同的,他们会有对应的前缀。

// We identify hosts on which the block is cached with this prefix.  Because this prefix contains
// underscores, which are not legal characters in hostnames, there should be no potential for
// confusion.  See  RFC 952 and RFC 1123 for information about the format of hostnames.
val inMemoryLocationTag = "hdfs_cache_"

// Identify locations of executors with this prefix.
val executorLocationTag = "executor_"

def apply(host: String, executorId: String): TaskLocation = {new ExecutorCacheTaskLocation(host, executorId)
}

这将为TaskSetManager 计算 tasks 的最优本地性提供了判断的依据。

(2)DAGScheduler 生成 taskSet

DAGScheduler 通过调用 submitStage 来提交一个 stage 对应的 tasks,submitStage 会调用submitMissingTasks,submitMissingTasks 会以下代码来确定每个需要计算的 task 的preferredLocations,这里调用到了 RDD#getPreferredLocs,getPreferredLocs返回的 partition 的优先位置,就是这个 partition 对应的 task 的优先位置。
/** 说明:
  * DAGSchedulerstage创建要提交给TaskScheduler调度执行的taskSet时,
  * 对于taskSet中的每一个task,其优先位置与其对应的partition对应的优先位置一致
  */
private def submitMissingTasks(stage: Stage, jobId: Int) {logDebug("submitMissingTasks(" + stage + ")")// First figure out the indexes of partition ids to compute.
  val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()// Use the scheduling pool, job group, description, etc. from an ActiveJob associated
  // with this Stage
  val properties = jobIdToActiveJob(jobId).propertiesrunningStages += stage// SparkListenerStageSubmitted should be posted before testing whether tasks are
  // serializable. If tasks are not serializable, a SparkListenerStageCompleted event
  // will be posted, which should always come after a corresponding SparkListenerStageSubmitted
  // event.
  stage match {case s: ShuffleMapStage =>outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)case s: ResultStage =>outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)}//    调用返回的 taskIdToLocations: Map[Int, Seq[TaskLocation]](即taskIdToLocations: Seq[ taskId -> Seq[hosts] ])
  //    会在submitMissingTasks生成要提交给 TaskScheduler 调度的 taskSet: Seq[Task[_]]时用到
  val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {stage match {case s: ShuffleMapStage =>partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id)) }.toMapcase s: ResultStage =>partitionsToCompute.map { id =>val p = s.partitions(id)(id, getPreferredLocs(stage.rdd, p))}.toMap}} catch {case NonFatal(e) =>stage.makeNewStageAttempt(partitionsToCompute.size)listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))runningStages -= stagereturn
  }stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)// If there are tasks to execute, record the submission time of the stage. Otherwise,
  // post the even without the submission time, which indicates that this stage was
  // skipped.
  if (partitionsToCompute.nonEmpty) {stage.latestInfo.submissionTime = Some(clock.getTimeMillis())}listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
  // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
  // the serialized copy of the RDD and for each task we will deserialize it, which means each
  // task gets a different copy of the RDD. This provides stronger isolation between tasks that
  // might modify state of objects referenced in their closures. This is necessary in Hadoop
  // where the JobConf/Configuration object is not thread-safe.
  var taskBinary: Broadcast[Array[Byte]] = null
  var partitions: Array[Partition] = null
  try {// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
    // For ResultTask, serialize and broadcast (rdd, func).
    var taskBinaryBytes: Array[Byte] = null
    // taskBinaryBytes and partitions are both effected by the checkpoint status. We need
    // this synchronization in case another concurrent job is checkpointing this RDD, so we get a
    // consistent view of both variables.
    RDDCheckpointData.synchronized {taskBinaryBytes = stage match {case stage: ShuffleMapStage =>JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))case stage: ResultStage =>JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))}partitions = stage.rdd.partitions}taskBinary = sc.broadcast(taskBinaryBytes)} catch {// In the case of a failure during serialization, abort the stage.
    case e: NotSerializableException =>abortStage(stage, "Task not serializable: " + e.toString, Some(e))runningStages -= stage// Abort execution
      return
    case NonFatal(e) =>abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))runningStages -= stagereturn
  }val tasks: Seq[Task[_]] = try {val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()stage match {case stage: ShuffleMapStage =>stage.pendingPartitions.clear()partitionsToCompute.map { id =>val locs = taskIdToLocations(id)val part = partitions(id)stage.pendingPartitions += id//  使用上述获得的 task 对应的优先位置,即 locs 来构造ShuffleMapTask
          new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
            taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
            Option(sc.applicationId), sc.applicationAttemptId)}case stage: ResultStage =>partitionsToCompute.map { id =>val p: Int = stage.partitions(id)val part = partitions(p)val locs = taskIdToLocations(id)//  使用上述获得的 task 对应的优先位置,即 locs 来构造ResultTask
          new ResultTask(stage.id, stage.latestInfo.attemptNumber,
            taskBinary, part, locs, id, properties, serializedTaskMetrics,
            Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)}}} catch {case NonFatal(e) =>abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))runningStages -= stagereturn
  }if (tasks.size > 0) {logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))} else {// Because we posted SparkListenerStageSubmitted earlier, we should mark
    // the stage as completed here in case there are no tasks to run
    markStageAsFinished(stage, None)val debugString = stage match {case stage: ShuffleMapStage =>s"Stage ${stage} is actually done; " +s"(available: ${stage.isAvailable}," +s"available outputs: ${stage.numAvailableOutputs}," +s"partitions: ${stage.numPartitions})"
      case stage: ResultStage =>s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
    }logDebug(debugString)submitWaitingChildStages(stage)}
}

(3)构造 TaskSetManager,确定 locality levels

在DAGScheduler向TaskScheduler提交了taskSet之后,TaskSchedulerImpl会为每个taskSet创建一个TaskSetManager对象,该对象包含taskSet所有tasks,并管理这些tasks的执行,其中就包括计算taskSetManager中的tasks都有哪些locality levels,以便在调度和延迟调度tasks时发挥作用。
在构造 TaskSetManager 对象时,会调用var myLocalityLevels = computeValidLocalityLevels()来确定locality levels

/**
 * Track the set of locality levels which are valid given the tasks locality preferences and
 * the set of currently available executors.  This is updated as executors are added and removed.
 * This allows a performance optimization, of skipping levels that aren't relevant (eg., skip
 * PROCESS_LOCAL if no tasks could be run PROCESS_LOCAL for the current set of executors).
 */
private[scheduler] var myLocalityLevels = computeValidLocalityLevels()

computeValidLocalityLevels源码如下:

/**
 * 计算此TaskSet中的locality levels,
 * 假设所有任务已经使用addPendingTask添加到队列中。
 *
 */
private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}val levels = new ArrayBuffer[TaskLocality.TaskLocality]if (!pendingTasksForExecutor.isEmpty &&pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {levels += PROCESS_LOCAL
  }if (!pendingTasksForHost.isEmpty &&pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {levels += NODE_LOCAL
  }if (!pendingTasksWithNoPrefs.isEmpty) {levels += NO_PREF
  }if (!pendingTasksForRack.isEmpty &&pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {levels += RACK_LOCAL
  }levels += ANY
  logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))levels.toArray
}

这个函数的作用是:

  1. taskSetManager的locality levels是否包含"PROCESS_LOCAL"
  2. taskSetManager的locality levels是否包含"NODE_LOCAL"
  3. taskSetManager的locality levels是否包含"NO_PREF"
  4. taskSetManager的locality levels是否包含"RACK_LOCAL"

(4)详解上面函数的四个作用

1)taskSetManager 的 locality levels是否包含 PROCESS_LOCAL

//     判断是否为PROCESS_LOCAL
//     关键的方法:pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))
    if (!pendingTasksForExecutor.isEmpty &&pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {levels += PROCESS_LOCAL
    }

注意:

  • pendingTasksForExecutor是什么?
  • sched.isExecutorAlive(_)做了什么?

pendingTasksForExecutor是什么?

在 TaskSetManager 构造函数中创建了pendingTasksForExecutor:

// Set of pending tasks for each executor. These collections are actually
// treated as stacks, in which new tasks are added to the end of the
// ArrayBuffer and removed from the end. This makes it faster to detect
// tasks that repeatedly fail because whenever a task failed, it is put
// back at the head of the stack. These collections may contain duplicates
// for two reasons:
// (1): Tasks are only removed lazily; when a task is launched, it remains
// in all the pending lists except the one that it was launched from.
// (2): Tasks may be re-added to these lists multiple times as a result
// of failures.
// Duplicates are handled in dequeueTaskFromList, which ensures that a
// task hasn't already started running before launching it.
private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]

说明:key 为executoroId,value 为task index 数组。
在 TaskSetManager 的构造函数中调用:

// Add all our tasks to the pending lists. We do this in reverse order
// of task index so that tasks with low indices get launched first.
for (i <- (0 until numTasks).reverse) {addPendingTask(i)
}

重点:pendingTasksForExecutor保存着当前可用的 executor 对应的 partition 缓存在在其上内存中的 tasks 的映射关系
说明:
这段调用为 taskSetManager 中的优先位置类型为 ExecutorCacheTaskLocation(这里通过toString返回的格式进行匹配)的tasks调用addPendingTask,addPendingTask 获取 task 的优先位置,
即一个 Seq[String];再获得这组优先位置对应的 executors,从而反过来获得了 executor 对应 partition 缓存在其上内存的 tasks,即pendingTasksForExecutor

sched.isExecutorAlive(_)做了什么?

实现类:

def isExecutorAlive(execId: String): Boolean = synchronized {executorIdToRunningTaskIds.contains(execId)
}

保存集群当前所有可用的 executor id(这里对 executor 的 free cores 个数并没有要求,可为0)

// IDs of the tasks running on each executor.(spark-2.x.x版本定义的变量)
private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]]// Which executor IDs we have executors on.(spark-1.6.x版本之前定义的变量)
val activeExecutorIds = new HashSet[String]

每当 DAGScheduler 提交 taskSet 会触发 TaskScheduler 调用 resourceOffers 方法,该方法会更新当前可用的 executors 至 activeExecutorIds;

当有 executor lost 的时候,TaskSchedulerImpl 也会调用 removeExecutor 来将 lost 的executor 从 activeExecutorIds 中去除。

结论:

  • isExecutorAlive作用:判断当前参数中的 executor id 是否 为active的。
  • pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))的含义:

taskSetManager的所有对应的partition数据缓存在executor内存中的tasks对应的所有executor,是否有任一 active,若有则返回 true;否则返回 false;那么,我们就知道了如何去判断一个 taskSetManager 对象的 locality levels 是否包含 PROCESS_LOCAL。

2)taskSetManager 的 locality levels是否包含 NODE_LOCAL

//     判断是否为NODE_LOCAL
//     关键的方法:pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))
    if (!pendingTasksForHost.isEmpty &&pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {levels += NODE_LOCAL
    }

关键方法:

pendingTasksForHost.keySet.exists(??)

// Set of pending tasks for each host. Similar to pendingTasksForExecutor,
// but at host level.
// key 为 host,value 为 preferredLocations 包含该 host 的 tasks indexs 数组
private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]

sched.hasExecutorsAliveOnHost(??)

def hasExecutorsAliveOnHost(host: String): Boolean = synchronized {hostToExecutors.contains(host)
}
// The set of executors we have on each host; this is used to compute hostsAlive, which
// in turn is used to decide when we can attain data locality on a given host
// executorsByHost为 HashMap[String, HashSet[String]] 类型,key 为 host,value 为该 host 上的 active executors
protected val hostToExecutors = new HashMap[String, HashSet[String]]

如何判断 taskSetManager 的 locality levels:

taskSetManager 的所有 tasks 对应的所有 hosts,是否有任一是 tasks 的优先位置 hosts,若有返回 true;否则返回 false。

3)taskSetManager 的 locality levels是否包含 RACK_LOCAL

//     判断是否为RACK_LOCAL
//     关键的方法: pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))
    if (!pendingTasksForRack.isEmpty &&pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {levels += RACK_LOCAL
    }

关键方法;

pendingTasksForRack.keySet.exists(??)

// Set of pending tasks for each rack -- similar to the above.
// key为 rack,value 为优先位置所在的 host 属于该机架的 tasks
private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]

hasHostAliveOnRack(??)

def hasHostAliveOnRack(rack: String): Boolean = synchronized {hostsByRack.contains(rack)
}
// key 为 rack,value 为该 rack 上所有作为 taskSetManager 优先位置的 hosts
protected val hostsByRack = new HashMap[String, HashSet[String]]

如何判断 taskSetManager 的 locality levels 是否包含RACK_LOCAL:

taskSetManager的所有tasks的优先位置host所在的所有racks与当前active executors所在的机架是否有交集,若有则返回 true,否则返回 false。

4)taskSetManager 的 locality levels是否包含 NO_PREF

//     判断是否为NO_PREF
//     关键的方法:pendingTasksWithNoPrefs.isEmpty
    if (!pendingTasksWithNoPrefs.isEmpty) {levels += NO_PREF
    }

如何判断 taskSetManager 的 locality levels是否包含 NO_PREF:
如果一个 RDD 的某些 partitions 没有优先位置(比如是以内存集合作为数据源且 executors 和 driver不在同一个节点),那么这个 RDD action 产生的 taskSetManagers 的 locality levels 就包含 NO_PREF。

5)对于所有的 taskSetManager 均包含 ANY

5、从以上分析可以做出相关优化

(1)尽量保证数据在同一个JVM中

(2)找到数据源头和父RDD:rdd的顺着窄依赖,往上找父依赖,直到找到第一个窄依赖,也就找到了数据读取源头

(3)在父RDD进行Stage划分时,使用persist:如果无缓存,那么就不能是PROCESS_LOCAL,最好也就是NODE_LOCAL

参考:

  • Spark源码:https://github.com/apache/spark/blob/v2.3.0/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
  • Spark调优:https://spark.apache.org/docs/2.3.0/tuning.html
  • stackoverflow:https://stackoverflow.com/questions/26994025/whats-the-meaning-of-locality-levelon-spark-cluster(非spark-2.x版本)
  • spark-1.5.x版本:https://github.com/apache/spark/blob/branch-1.5/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
  • 简书:https://www.jianshu.com/p/05034a9c8cae(spark-1.5.x版本)
  • http://coolplayer.net/2017/05/02/%E8%B0%88%E8%B0%88spark-%E7%9A%84%E8%AE%A1%E7%AE%97%E6%9C%AC%E5%9C%B0%E6%80%A7/

[Spark进阶]--深入Spark Locality Level相关推荐

  1. [Spark进阶]--Spark配置参数说明

    感谢原文链接:http://blog.javachen.com/2015/06/07/spark-configuration.html 参考官方原文:https://spark.apache.org/ ...

  2. 第二部分:Spark进阶篇

    第一部分:Spark基础篇_奔跑者-辉的博客-CSDN博客 第二部分:Spark进阶篇_奔跑者-辉的博客-CSDN博客 第三部分:Spark调优篇_奔跑者-辉的博客-CSDN博客 目录 1 Spark ...

  3. 必看!Spark 进阶之路之「SparkSQL」入门概述 | 博文精选

    作者 | Alice菌 责编 | Carol 来源 | CSDN 博客 封图 | CSDN付费下载于视觉中国 在之前的文章中,我们已经完成了对于Spark核心SparkCore的详细介绍.而今天想为为 ...

  4. 从零到一spark进阶之路(三) pyspark 处理movies数据集(整理ING6-20)

    PySpark简介 官方对PySpark的释义为:"PySpark is the Python API for Spark". 也就是说pyspark为Spark提供的Python ...

  5. spark—3(Spark Scheduler)

    2019独角兽企业重金招聘Python工程师标准>>> Spark的核心是根据RDD来实现的,Spark Scheduler则为Spark核心实现的重要一环,其作用就是任务调度.Sp ...

  6. Spark精华问答 | Spark和Hadoop的架构区别解读

    总的来说,Spark采用更先进的架构,使得灵活性.易用性.性能等方面都比Hadoop更有优势,有取代Hadoop的趋势,但其稳定性有待进一步提高.我总结,具体表现在如下几个方面. 1 Q:Spark和 ...

  7. Spark 内存管理 spark.executor.memory /spark.memory.fraction/spark.memory.offHeap.size【堆外内存/内存管理】 钨丝计划

    spark1.6及之后: 堆内内存: spark.executor.memory 包含 spark.memory.fraction: spark.memory.fraction 包含 spark.me ...

  8. Spark系列之Spark启动与基础使用

    title: Spark系列 第三章 Spark启动与基础使用 3.1 Spark Shell 3.1.1 Spark Shell启动 安装目录的bin目录下面,启动命令: spark-shell $ ...

  9. 【Spark篇】---Spark解决数据倾斜问题

    [Spark篇]---Spark解决数据倾斜问题 参考文章: (1)[Spark篇]---Spark解决数据倾斜问题 (2)https://www.cnblogs.com/LHWorldBlog/p/ ...

  10. Spark性能优化 -- Spark SQL、DataFrame、Dataset

    本文将详细分析和总结Spark SQL及其DataFrame.Dataset的相关原理和优化过程. Spark SQL简介 Spark SQL是Spark中 具有 大规模关系查询的结构化数据处理 模块 ...

最新文章

  1. java ase 加密_java实现ase加密解密
  2. 趣链 BitXHub跨链平台 (3)跨链交易流程
  3. shell date
  4. 第三次学JAVA再学不好就吃翔(part45)--Object类
  5. 抛出错误_不用try catch,如何机智的捕获错误
  6. spring-boot中使用druid连接池
  7. java 解析xml字符串的_java 解析xml字符串
  8. 女人要的安全感到的是什么?
  9. 斐波那契数列-爬楼梯算法
  10. blender icons 图标
  11. matlab拷贝不进u盘,Mac无法拷贝文件到U盘怎么办
  12. win10离线安装framework3.5以及dism找不到源文件解决方法(已测试有效)
  13. 微信公众号已认证如何修改名字?
  14. 启动两个80线的速腾雷达
  15. scrapy爬虫时遇到Couldnt bind Too many open files
  16. mysql 同步复制 半同步复制_一文看懂MySQL的异步复制、全同步复制与半同步复制...
  17. 河南省高中计算机会考难不难,关于河南省的高中会考我想说。。。
  18. 对学校的希望和寄语_家长写给孩子表达鼓励与期望的话
  19. 尚硅谷在线教育十四:微信支付
  20. 计算机网络缩写词集锦

热门文章

  1. 两阶段目标检测详解--FastRCNN
  2. google账号市场登陆
  3. 【linux内核分析与应用-陈莉君】内核同步措施
  4. [字节跳动]2018秋招算法题【持续更新中】
  5. js,JQ 图片转换base64 base64转换为file对象,blob对象
  6. 创新科技普济天下,华为无线重构世界
  7. Windows10 最新版官方iso镜像(截至2021/02)win10 2009(20h2)
  8. JavaScript高程三----(基础一)
  9. 计算机网络物理层之信道与信道容量
  10. MongoSocketOpenException: Exception opening socket