上篇文章我们介绍到,SparkContext初始化的时候,在TaskScheduler启动阶段会向Master发送RegisterApplication事件,下面我们进入Master主类,从它接收到该事件后接着分析。

另外由于这里的代码不好远程debug,所以本篇文章的讲解不会涉及到debug过程,纯纯的是枯燥的

源码解析。

    case RegisterApplication(description, driver) =>// 如果当前master的状态是standby(备用的),也就是当前master不是active master,那么application来请求注册,什么都不会干if (state == RecoveryState.STANDBY) {} else {logInfo("Registering app " + description.name)//根据 Driver( TaskScheduler 启动阶段生成的RPC节点 )传递过来参数创建ApplicationInfo对象val app = createApplication(description, driver)//重点一:将application注册到master上registerApplication(app)logInfo("Registered app " + description.name + " with ID " + app.id)//将application信息持久化存储下来persistenceEngine.addApplication(app)//重点二:向Driver( TaskScheduler 启动阶段生成的RPC节点 )发送Application注册的响应driver.send(RegisteredApplication(app.id, self))//重点三:调用master的调度方法schedule()}

可以看到master接收到RegisterApplication事件之后,会先注册application的信息,随后响应driver,最后调用master的任务调度方法。下面我们分别看一下:

重点一:将application注册到master上

这一步没什么难点,主要是将application相关的信息登记到master的各种集合中:

  private def registerApplication(app: ApplicationInfo): Unit = {//获取driver地址信息val appAddress = app.driver.addressif (addressToApp.contains(appAddress)) {logInfo("Attempted to re-register application at same address: " + appAddress)return}//在application的度量系统中记录该application的信息applicationMetricsSystem.registerSource(app.appSource)//将application添加到各种信息记录的集合中apps += appidToApp(app.id) = appendpointToApp(app.driver) = appaddressToApp(appAddress) = app//将application添加到等待队列中(重点)waitingApps += app}

注册代码很少,主要留心下最后一步将application加入等待队列即可,加入等待队列后,后面就会被master进行调度处理。这块的具体处理我们再重点三中进行讲述。

重点二:向Driver( TaskScheduler 启动阶段生成的RPC节点 )发送Application注册的响应

在看源码前,可以先留心下master中的driver rpc节点是指什么,通过前面的文章我们可以知道driver节点实际是TaskScheduler 启动阶段生成的RPC节点,对于我们的案例来说,其实就是StandaloneAppClient对象节点。了解了这些概念,后面我们看源码也会清晰很多。

就比如说这里向Driver发送RegisteredApplication响应,接收事件的处理逻辑是什么,我们如果想知道,就必须知道这个Driver节点到底是什么。发送事件没什么好说的,这里我们直接看下StandaloneAppClient的响应逻辑

case RegisteredApplication(appId_, masterRef) =>//记录appIdappId.set(appId_)//标识app已被注册过registered.set(true)//存储master RPC节点的引用master = Some(masterRef)//通知LanchServer当前app的appidlistener.connected(appId.get)

可以看到响应逻辑很简单,就是一些常见的标识修改,信息记录等,关键的是如何快速定位到所发送的RPC节点。本案例是StandaloneAppClient接收,如果是cluster部署,还是吗?答案肯定为不是。

重点三:调用master的调度方法(核心,重中之重)

master的schedule方法是master的核心方法,它为等待的Application安排当前可用的资源因此每次有新应用加入或资源可用性发生变化时,都会调用此方法。另外在阅读源码前,有几个小点留意下,这样更便于理解源码:

1)driver的调度一定要在executor之前,因为executor需要向driver创建过程中TaskScheduler注册自己的信息。

2)waitting driver遍历时,一开始会挑选ALIVE状态的worker并随机排列,随后则是依次查看worker是否可以启动driver。如果还有未启动的driver,且还有worker没有被遍历到则继续遍历worker尝试在其上启动driver。通过这个代码逻辑我们也可以看出来,每个worker默认是至多运行一个driver。

private def schedule(): Unit = {//如果当前节点不是ALIVE活跃状态,则直接退出if (state != RecoveryState.ALIVE) {return}// 将spark集群中可用的worker随机打乱排列val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))//记录活跃Worker的数量val numWorkersAlive = shuffledAliveWorkers.sizevar curPos = 0//遍历所有等待的driver,在资源足够时发起driver的启动构建//对于每个driver,我们依次遍历可用的worker,尝试在worker上发起driver的运行for (driver <- waitingDrivers.toList) { var launched = falsevar isClusterIdle = true//记录已经访问过的worker数var numWorkersVisited = 0//当还有剩余worker没有遍历到,且当前driver没有被发起时,进入while循环while (numWorkersVisited < numWorkersAlive && !launched) {//从可用worker列表中依据顺序取出下一个没有被访问到的workerval worker = shuffledAliveWorkers(curPos)//判断当前worker是不是driver和worker都没有启动过的空节点isClusterIdle = worker.drivers.isEmpty && worker.executors.isEmpty//访问的worker记录数加1numWorkersVisited += 1//重点一:如果worker节点的资源(内存、cpu等)满足driver的发起条件,则尝试在该worker上启动driverif (canLaunchDriver(worker, driver.desc)) {val allocated = worker.acquireResources(driver.desc.resourceReqs)driver.withResources(allocated)//重点二:在该worker上发起driver运行launchDriver(worker, driver)//driver发起后,将该driver从等待调度的队列中移除waitingDrivers -= drivelaunched = true}//移动可用worker数组的下标curPos = (curPos + 1) % numWorkersAlive}//如果存在worker节点既没运行driver也没运行executor,但是资源仍不符合driver运行要求,说明driver需要的资源很多,我们需要调整所有worker的资源配给if (!launched && isClusterIdle) {logWarning(s"Driver ${driver.id} requires more resource than any of Workers could have.")}}//重点三:启动executorstartExecutorsOnWorkers()}

schedule方法中又有三处比较关键的点,分别是driver启动资源的判断、driver调度的发起、worker调度的发起。下面分别看一下:

重点一:如果worker节点的资源(内存、cpu等)满足driver的发起条件,则尝试在该worker上启动driver

  private def canLaunchDriver(worker: WorkerInfo, desc: DriverDescription): Boolean = {canLaunch(worker, desc.mem, desc.cores, desc.resourceReqs)}private def canLaunch(worker: WorkerInfo,memoryReq: Int, // driver的内存需求coresReq: Int,  //driver的cpu需求resourceRequirements: Seq[ResourceRequirement]) //driver的resource需求: Boolean = {val enoughMem = worker.memoryFree >= memoryReqval enoughCores = worker.coresFree >= coresReqval enoughResources = ResourceUtils.resourcesMeetRequirements(worker.resourcesAmountFree, resourceRequirements)enoughMem && enoughCores && enoughResources}

可以看到资源判断逻辑很简单,就是判断worker的内存、cpu、resource数三个指标是否同时符合driver的启动要求。这里留意下memoryFree 、coresFree 、resourcesAmountFree三个变量名,在后续讲解executor具体创建时还会涉及该变量。

重点二:在该worker上发起driver运行

在worker上发起driver绝对是重中之重,虽然我们的WordCount案例走的是client模式,不会有driver的调度和运行发起。但是由于这块实在太核心,所以这里也介绍下:

  private def launchDriver(worker: WorkerInfo, driver: DriverInfo): Unit = {logInfo("Launching driver " + driver.id + " on worker " + worker.id)//将driver加入worker内存的缓存结构//将worker内使用的内存和cpu数量,都加上driver需要的内存和cpu数量worker.addDriver(driver)//把worker加入到driver内部的缓存结构中driver.worker = Some(worker)//向worker发送LaunchDriver事件,让worker来启动driverworker.endpoint.send(LaunchDriver(driver.id, driver.desc, driver.resources))//将driver状态设置为RUNNINGdriver.state = DriverState.RUNNING}

可以看到driver的发起其实比较简单,主要是将worker和driver相互引用,更新worker的资源使用情况,然后通过RPC节点,向对应的worker节点发送LaunchDriver事件,接下来我们到worker上看下其接收到LaunchDriver事件后的处理逻辑:

case LaunchDriver(driverId, driverDesc, resources_) =>logInfo(s"Asked to launch driver $driverId")//构建并启动DriverRunner线程(该线程会启动一个Driver进程)val driver = new DriverRunner(conf,driverId,workDir,sparkHome,driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),self,workerUri,securityMgr,resources_)drivers(driverId) = driver//启动DriverRunnerdriver.start()//当前worker加上driver运行需要的内存和cpu等资源coresUsed += driverDesc.coresmemoryUsed += driverDesc.memaddResourcesUsed(resources_)

我们接着看下DriverRunner的start方法:

private[worker] def start() = {//启动一个java线程new Thread("DriverRunner for " + driverId) {override def run(): Unit = {//设置线程执行结束的钩子处理函数var shutdownHook: AnyRef = nulltry {shutdownHook = ShutdownHookManager.addShutdownHook { () =>logInfo(s"Worker shutting down, killing driver $driverId")kill()}//重点:准备driver需要的jar文件并运行driverval exitCode = prepareAndRunDriver()//对driver的状态做一些封装处理finalState = if (exitCode == 0) {Some(DriverState.FINISHED)} else if (killed) {Some(DriverState.KILLED)} else {Some(DriverState.FAILED)}} catch {case e: Exception =>kill()finalState = Some(DriverState.ERROR)finalException = Some(e)} finally {if (shutdownHook != null) {ShutdownHookManager.removeShutdownHook(shutdownHook)}}// 这个DriverRunner线程,向它所属的worker,发送一个DriverStateChanged事件worker.send(DriverStateChanged(driverId, finalState.get, finalException))}}.start()}

在driverRunner中会单独启动一个线程准备driver运行环境和运行driver,并将运行结果封装后发送给当前worker节点,接着我们在看下worker如何准备driver运行环境与发起driver运行的。

  private[worker] def prepareAndRunDriver(): Int = {//创建工作目录val driverDir = createWorkingDirectory()//下载用户上传的jar(我们编写完spark应用程序,打包成的jar)val localJarFilename = downloadUserJar(driverDir)//准备额外的资源文件val resourceFileOpt = prepareResourcesFile(SPARK_DRIVER_PREFIX, resources, driverDir)def substituteVariables(argument: String): String = argument match {case "{{WORKER_URL}}" => workerUrlcase "{{USER_JAR}}" => localJarFilenamecase other => other}val javaOpts = driverDesc.command.javaOpts ++ resourceFileOpt.map(f =>Seq(s"-D${DRIVER_RESOURCES_FILE.key}=${f.getAbsolutePath}")).getOrElse(Seq.empty)//构建ProcessBuilder,传入了driver启动命令,需要的内存大小等信息val builder = CommandUtils.buildProcessBuilder(driverDesc.command.copy(javaOpts = javaOpts),securityManager, driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)//通过ProcessBuilder启动driver进程runDriver(builder, driverDir, driverDesc.supervise)}

可以看到worker会为了driver先创建对应的工作目录,随后通过命令调用的方式调度driver的运行,这里就不进一步的研究运行的命令具体是什么了,这个跟我们阅读源码的初衷偏离了很多。所以关于driver的启动暂时先介绍到这,我们回归主线,接着介绍worker创建任务的调度过程。

重点三:在worker上启动executor

还记得我们前一章讲解的executor构建命令拼装吗,其最后都封装在了application中,因此我们在创建启动executor的时候可以直接到application中获取相关的信息。

在worker上调度和启动executor的源码如下:

  private def startExecutorsOnWorkers(): Unit = {for (app <- waitingApps) {//获取每个executor的cpu需求数,默认是1val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)// 如果app需要的core大于等于每个executor需要的core,则接着处理,如果app剩余需要的cpu数据,小于executor构建需要的cpu数,则不在进行新executor的构建(节省资源)if (app.coresLeft >= coresPerExecutor) {// 过滤出ALIVE状态且资源符合executor创建的worker节点val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE).filter(canLaunchExecutor(_, app.desc)).sortBy(_.coresFree).reverse//如果只有一个app等待创建executor,但是仍没有可用worker创建executor,说明集群资源不够,打印warn警告val appMayHang = waitingApps.length == 1 &&waitingApps.head.executors.isEmpty && usableWorkers.isEmptyif (appMayHang) {logWarning(s"App ${app.id} requires more resource than any of Workers could have.")}//重点一:计算每个worker要分配出去的cpu数,该方法返回的是一个数组val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)//重点二:根据上一步计算的每个worker要分配出去的cpu数,遍历可用worker,如果该worker需要分配的cpu数大于0,则向该worker发送对应的LaunchExecutor事件for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {allocateWorkerResourceToExecutors(app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))}}}}

这里有一个比较绕的逻辑点,就是当application剩余需要的cpu数大于单个executor需要的cpu数据才进行新executor的创建,这一步的目的个人猜测是节省资源,要知道,如果application的剩余需求cpu小于单executor需要的cpu数,要么是已经存在了运行的executor,要么就是配置有问题,正常情况下很少出现application需求cpu数小于executor创建需求cpu数的情况。

整体来看,这块有两个重要的步骤,我们分别看下:

重点一:计算每个worker要分配出去的cpu数

计算worker分配cpu数的算法有两种,一种是spreadOutApps(默认):遍历worker,一次只在worker上分配一个executor,另一种是非spreadOutApps:尽量在一个worker启动所有executor,直到分配结束或者资源不够。下面我们看下详细代码:

  private def scheduleExecutorsOnWorkers(app: ApplicationInfo,usableWorkers: Array[WorkerInfo],spreadOutApps: Boolean): Array[Int] = {// 获取app中设置或者默认的每个executor需要的cpu和内存数val coresPerExecutor = app.desc.coresPerExecutorval minCoresPerExecutor = coresPerExecutor.getOrElse(1)val oneExecutorPerWorker = coresPerExecutor.isEmptyval memoryPerExecutor = app.desc.memoryPerExecutorMBval resourceReqsPerExecutor = app.desc.resourceReqsPerExecutor//获取可用worker的数量val numUsable = usableWorkers.length//创建一个空数组,存储了每个worker要分配出去的cpu数量val assignedCores = new Array[Int](numUsable) //创建一个空数组,存储了每个worker要创建的executor的数量(资源足够或者spark.executor.cores未设置的情况下,可以在一个worker上启动多个executor)val assignedExecutors = new Array[Int](numUsable) //获取到底要分配多少cpu,取app剩余要分配的cpu数量和worker总共可用cpu数量的最小值var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)// 内部方法,用于判断worker的资源是否符合发起executor创建def canLaunchExecutorForApp(pos: Int): Boolean = {val keepScheduling = coresToAssign >= minCoresPerExecutorval enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutorval assignedExecutorNum = assignedExecutors(pos)val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutorNum == 0if (launchingNewExecutor) {val assignedMemory = assignedExecutorNum * memoryPerExecutorval enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutorval assignedResources = resourceReqsPerExecutor.map {req => req.resourceName -> req.amount * assignedExecutorNum}.toMapval resourcesFree = usableWorkers(pos).resourcesAmountFree.map {case (rName, free) => rName -> (free - assignedResources.getOrElse(rName, 0))}val enoughResources = ResourceUtils.resourcesMeetRequirements(resourcesFree, resourceReqsPerExecutor)val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimitkeepScheduling && enoughCores && enoughMemory && enoughResources && underLimit} else {keepScheduling && enoughCores}}// 获取可以符合executor构建条件的workersvar freeWorkers = (0 until numUsable).filter(canLaunchExecutorForApp)while (freeWorkers.nonEmpty) {//遍历worker进行分配freeWorkers.foreach { pos =>var keepScheduling = truewhile (keepScheduling && canLaunchExecutorForApp(pos)) {//待分配cpu数减去executor最小分配cpu数coresToAssign -= minCoresPerExecutor//记录当前worker分配出去的cpu数assignedCores(pos) += minCoresPerExecutor// 如果没设置executor需要的cpu数量,默认executor需要的cpu数为1,且可以再一个worker上启动该app的多个executor,该worker消耗总cpu数仍是1if (oneExecutorPerWorker) {assignedExecutors(pos) = 1} else {assignedExecutors(pos) += 1}//spreadOutApps调度模式一次只在一个worker上创建executor。非spreadOutApps调度模式,会尽可能在一个worker上启动所有的executor,直到资源不够再结束。if (spreadOutApps) {keepScheduling = false}}}//分配一轮后,再次过滤可以接着进行分配的workerfreeWorkers = freeWorkers.filter(canLaunchExecutorForApp)}//返回每个worker分配的core数量assignedCores}

这里其实判断worker资源能否符合executor创建的逻辑是比较复杂和重要的,但是由于这块跟我们主线关联不是特别大,且逻辑不易梳理,所以这块并没有详细介绍,后面有时间在补上。

经过上述步骤我们拿到了每个worker要分配的cpu数,接下来我们看下如何发起executor构建事件。

重点二:向worker发送对应的LaunchExecutor事件

通过上一步计算出了每个worker要分配出去的cpu数,这里接着遍历可用worker,如果该worker需要分配的cpu数大于0,则向该worker发送对应的LaunchExecutor事件。源码如下:

  private def allocateWorkerResourceToExecutors(app: ApplicationInfo,assignedCores: Int,coresPerExecutor: Option[Int],worker: WorkerInfo): Unit = {// 根据当前worker要分配的cpu总核数和每个executor需要的cpu数,计算出当前worker要分配的executor数val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)// 获取当前worker要分配的cpu数val coresToAssign = coresPerExecutor.getOrElse(assignedCores)//循环创建指定数目的executorfor (i <- 1 to numExecutors) {//从worker获取指定资源  val allocated = worker.acquireResources(app.desc.resourceReqsPerExecutor)//创建ExecutorDesc对象,并将其记录到application中val exec = app.addExecutor(worker, coresToAssign, allocated)//重点:发起executor的创建launchExecutor(worker, exec)//修改application的状态为runningapp.state = ApplicationState.RUNNING}}

这块的逻辑主要是根据worker要分配的总cpu数和每个executor要分配的cpu数计算出要在该worker上创建的executor数据,随后封装executor的构建信息为ExecutorDesc对象,再通过launchExecutor进行处理。我们再深入看下:

  private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)//将executor加入worker内部的缓存worker.addExecutor(exec)//重点一:通知Worker节点启动executorworker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id,exec.application.desc, exec.cores, exec.memory, exec.resources))//重点二:发消息告诉该application对应Driver该worker上的executor已经启动exec.application.driver.send(ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))}

这块代码量不多,但是涉及到了两个RPC节点的通信,我们分别看下:

重点一:通知Worker节点启动executor

worker节点接收了LaunchExecutor事件后如何处理,这个我们放到下一章将,因为除了对executor如何构建感兴趣外,我们还想看看executor到底是个什么东西。所以这块准备单独开个文章讲。

重点二:将executor信息注册到Driver上

这里将executor注册进driver,在我们的WordCount案例中,因为我们是以client模式提交任务,所以实际上是将事件发送到TaskScheduler启动阶段创建的StandaloneAppClient节点上,下面我们到该节点对象上看下其接收后的处理逻辑:

      case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>val fullId = appId + "/" + idlogInfo("Executor added: %s on %s (%s) with %d core(s)".format(fullId, workerId, hostPort,cores))listener.executorAdded(fullId, workerId, hostPort, cores, memory)

这里的listenr是一个接口,我们这里直接看它的实现类方法:

  override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int,memory: Int): Unit = {logInfo("Granted executor ID %s on hostPort %s with %d core(s), %s RAM".format(fullId, hostPort, cores, Utils.megabytesToString(memory)))}

可以看到driver接收到executor之后没有任何记录,那么它后续怎么下发task任务给executor呢?这里说实话我也不知道,先留个悬念,再讲解job、stage、task的任务切割时,我们再结合这里的代码好好理一下。同时也对这的问题进行解答。

临时解答一:CoarseGrainedExecutorBackend创建Executor即将完成时会向CoarseGrainedSchedulerBackend(StandaloneSchedulerBackend继承了它)发送RegisterExecutor事件,这个时候会存储executor的相关信息。所以driver中实际上是记录了executor的信息的,至于最后是不是使用的这些信息,我们还不得而知,这个在我们阅读task任务源码时再进行补充解答。

最终解答:TaskScheduler(TaskScheduler是泛成,具体是CoarseGrainedSchedulerBackend对象)向executor提交task任务时,会从executorDataMap集合中选取全部活跃的executor,随后再经过一系列的判断进而确定所用的executor。这里的executorDataMap则是在“临时解答一”中介绍的RegisterExecutor事件里被注入executor的对象信息。

本篇文章的内容到这里就算结束了,下面是一些上述内容的总结:

1、在master节点上,它认为的driver RPC节点实际上是在TaskScheduler启动阶段生成的StandaloneAppClient。

2、driver的调度优先于executor的创建(cluster部署模式driver需要调度,client模式则不需要)

3、每个worker默认仅运行一个driver在它上面

4、executor的启动信息存储在application中

5、如果application需要的core大于等于单个executor需要的core,则发起新executor的构建,如果application需要的cpu数据,小于executor构建需要的cpu数,则不在进行新executor的构建(节省资源,个人推测

6、计算worker分配cpu数的算法有两种,一种是spreadOutApps:遍历worker,一次只在worker上分配一个executor,另一种是非spreadOutApps:尽量在一个worker启动所有executor,直到分配结束或者资源不够

7、如果没设置executor需要的cpu数量,默认executor需要的cpu数为1,且可以再一个worker上启动该app的多个executor,该worker消耗总cpu数仍是1。

spark源码(三)spark 如何进行driver、executor任务的调度,以及executor向driver的注册相关推荐

  1. Spark源码走读概述

    Spark代码量 --Spark:20000loc --Hadoop 1.0:90000loc --Hadoop 2.0:220000loc Spark生态系统代码量 Spark生态系统 概述 --构 ...

  2. Spark源码分析之七:Task运行(一)

    在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在T ...

  3. sparkcore分区_Spark学习:Spark源码和调优简介 Spark Core (二)

    本文基于 Spark 2.4.4 版本的源码,试图分析其 Core 模块的部分实现原理,其中如有错误,请指正.为了简化论述,将部分细节放到了源码中作为注释,因此正文中是主要内容. 第一部分内容见: S ...

  4. spark 源码分析之十八 -- Spark存储体系剖析

    本篇文章主要剖析BlockManager相关的类以及总结Spark底层存储体系. 总述 先看 BlockManager相关类之间的关系如下: 我们从NettyRpcEnv 开始,做一下简单说明. Ne ...

  5. spark 源码分析之二十 -- Stage的提交

    引言 上篇 spark 源码分析之十九 -- DAG的生成和Stage的划分 中,主要介绍了下图中的前两个阶段DAG的构建和Stage的划分. 本篇文章主要剖析,Stage是如何提交的. rdd的依赖 ...

  6. Spark源码和调优简介 Spark Core

    作者:calvinrzluo,腾讯 IEG 后台开发工程师 本文基于 Spark 2.4.4 版本的源码,试图分析其 Core 模块的部分实现原理,其中如有错误,请指正.为了简化论述,将部分细节放到了 ...

  7. spark 源码分析之十九 -- DAG的生成和Stage的划分

    上篇文章 spark 源码分析之十八 -- Spark存储体系剖析 重点剖析了 Spark的存储体系.从本篇文章开始,剖析Spark作业的调度和计算体系. 在说DAG之前,先简单说一下RDD. 对RD ...

  8. Spark源码分析之Sort-Based Shuffle读写流程

    一 概述 我们知道Spark Shuffle机制总共有三种: # 未优化的Hash Shuffle:每一个ShuffleMapTask都会为每一个ReducerTask创建一个单独的文件,总的文件数是 ...

  9. Spark源码阅读——DirectInputDStream

    2019独角兽企业重金招聘Python工程师标准>>> Spark源码分析--DirectInputDStream 在Spark-Streaming中,对流的抽象是使用DStream ...

  10. Spark源码解读之Shuffle计算引擎剖析

    Shuffle是Spark计算引擎的关键所在,是必须经历的一个阶段,在前面的文章中,我们剖析了Shuffle的原理以及Map阶段结果的输出与Reduce阶段结果如何读取.该篇文章是对前面两篇文章 [S ...

最新文章

  1. fpga如何约束走线_FPGA中的CLOCK REGION和SLR是什么含义
  2. 今晚,圆桌讨论Transformer跨界CV任务
  3. python重命名文件或目录_Python重命名多个文件的实例方法
  4. javascript等号判断相等流程
  5. SAP Spartacus加载delivery region的实现
  6. 手机展示海报就用它 再不要羡慕别人
  7. Java学习系列(十二)Java面向对象之序列化机制及版本
  8. sklearn 决策树无法处理类别特征
  9. Java Map排序
  10. mysql的全量备份和增量备份
  11. SIFT算法特征描述子构建---关键点定位原理及代码
  12. 转型之路:从数字化到数智化〡数智洞察
  13. Exception in thread “main“ java.lang.NullPointerException问题
  14. 求[X,Y]内被除3余1并且被除5余3的整数的和
  15. A2B的典型应用-给汽车的车机系统做从设备板
  16. java查看附近门店_微信公众号获取用户地理位置并列出附近的门店的示例代码...
  17. DataStream API:Overview
  18. 文献阅读:ACME: pan-specific peptide–MHC class I binding prediction through attention-based
  19. css compressor java_javascript/css压缩工具---yuicompressor使用方法
  20. Django 2.1.3 模型层 模型

热门文章

  1. 【android开发中的小技巧-1】
  2. springboot配置aop切面详解
  3. 预见2020关键一年|第二届中国区块链产业经济年会在国际金融博物馆召开
  4. 九龙战登录只显示一个服务器,九龙战新手必读 带你走出六大误区
  5. html中引入vue在线地址并使用
  6. 蒸妙发展的奇妙“5+N”故事擅长将合作方变股东
  7. 2021-02-24let和const,变量的解构赋值,函数扩展
  8. CoVOS:无需解码!利用压缩视频比特流的运动矢量和残差进行半监督的VOS加速(CVPR 2022)...
  9. SSH整合 简单的增删改查
  10. 无酒精啤酒行业调研报告 - 市场现状分析与发展前景预测