本期内容:

1 Executor WAL

2 消息重放

3 其他

  一切不能进行实时流处理的数据都是无效的数据。在流处理时代,SparkStreaming有着强大吸引力,而且发展前景广阔,加之Spark的生态系统,Streaming可以方便调用其他的诸如SQL,MLlib等强大框架,它必将一统天下。

  Spark Streaming运行时与其说是Spark Core上的一个流式处理框架,不如说是Spark Core上的一个最复杂的应用程序。如果可以掌握Spark streaming这个复杂的应用程序,那么其他的再复杂的应用程序都不在话下了。这里选择Spark Streaming作为版本定制的切入点也是大势所趋。

  

  Executor 的数据安全容错非常重要(计算容错主要借助Spark Core的容错机制容错,天然就是容错的),这里的容错是数据的安全容错。

  这里一共有俩种方式来写数据,一种是WriteAheadLogBasedBlockHandler,一种是BlockManagerBasedBlockHandler.这俩种方式保证了数据可以回放。并且在这个过程中,如果没有指定checkpoint的目录的话,会抛出异常。

  在这里我们着重研究WAL的方式,也就是WriteAheadLogBasedBlockHandler

private[streaming] object WriteAheadLogBasedBlockHandler{def checkpointDirToLogDir(checkpointDir:String, streamId: Int): String = {new Path(checkpointDir,new Path("receivedData", streamId.toString)).toString}
}

  这里checkpointDirToLogDir创建了一个保存数据的路径。

def createLogForReceiver(sparkConf: SparkConf,fileWalLogDirectory: String,fileWalHadoopConf: Configuration): WriteAheadLog = {createLog(false, sparkConf, fileWalLogDirectory, fileWalHadoopConf)
}

  该方法的实质的调用了createLog方法

private def createLog(isDriver: Boolean,sparkConf: SparkConf,fileWalLogDirectory: String,fileWalHadoopConf: Configuration): WriteAheadLog = {val classNameOption= if (isDriver) {sparkConf.getOption(DRIVER_WAL_CLASS_CONF_KEY)} else {sparkConf.getOption(RECEIVER_WAL_CLASS_CONF_KEY)}val wal =classNameOption.map { className =>try {instantiateClass(Utils.classForName(className).asInstanceOf[Class[_ <: WriteAheadLog]], sparkConf)} catch {case NonFatal(e) =>throw new SparkException(s"Couldnot create a write ahead log of class $className", e)}}.getOrElse {new FileBasedWriteAheadLog(sparkConf,fileWalLogDirectory, fileWalHadoopConf,getRollingIntervalSecs(sparkConf,isDriver), getMaxFailures(sparkConf, isDriver),shouldCloseFileAfterWrite(sparkConf,isDriver))}if (isBatchingEnabled(sparkConf,isDriver)) {new BatchedWriteAheadLog(wal,sparkConf)} else {wal}
}

  这里默认创建的是FileBasedWriteAheadLog。

  再回到storeBlock中,这里有storeInBlockManagerFuture和storeInWriteAheadLogFuture两个方法。所以数据存入BlockManager和WAl同时进行。完成之后,就可以交给trackEndpoint消息循环体了

def storeBlock(blockId: StreamBlockId, block:ReceivedBlock): ReceivedBlockStoreResult = {var numRecords= None: Option[Long]// Serialize the block so that it can be inserted intobothval serializedBlock= block match {case ArrayBufferBlock(arrayBuffer)=>numRecords = Some(arrayBuffer.size.toLong)blockManager.dataSerialize(blockId,arrayBuffer.iterator)case IteratorBlock(iterator)=>val countIterator= new CountingIterator(iterator)val serializedBlock= blockManager.dataSerialize(blockId, countIterator)numRecords = countIterator.countserializedBlockcase ByteBufferBlock(byteBuffer)=>byteBuffercase _=>throw new Exception(s"Could notpush $blockId to block manager, unexpected block type")}// Store the block in block managerval storeInBlockManagerFuture= Future {val putResult=blockManager.putBytes(blockId,serializedBlock, effectiveStorageLevel, tellMaster = true)if (!putResult.map{ _._1 }.contains(blockId)) {throw new SparkException(s"Could not store $blockId to block manager with storage level $storageLevel")}}// Store the block in write ahead logval storeInWriteAheadLogFuture= Future {writeAheadLog.write(serializedBlock, clock.getTimeMillis())}// Combine the futures, wait for both to complete, andreturn the write ahead log record handleval combinedFuture= storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2)val walRecordHandle= Await.result(combinedFuture, blockStoreTimeout)WriteAheadLogBasedStoreResult(blockId,numRecords, walRecordHandle)
}

  

备注:

资料来源于:DT_大数据梦工厂(Spark发行版本定制)

更多私密内容,请关注微信公众号:DT_Spark

如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580

转载于:https://www.cnblogs.com/pzwxySpark/p/Spark12.html

Spark版本定制第12天:Executor容错安全性相关推荐

  1. spark版本bug总结

    spark报错总结 spark sql 1.spark sql 删除分区报错 mismatched input '<=' expecting 2.spark sql 读取hive parquet ...

  2. 查看spark版本linux,如何查看spark版本和scala版本

    如何查看spark版本和scala版本 发布时间:2018-11-21 05:48, 浏览次数:2445 , 标签: spark scala 1.进入命令行状态 windows 电脑 方法一: 在系统 ...

  3. oracle 11.2.0.4 bug,警示:一个专为AIX上oracle11.2.0.4版本定制的Bug正在高发

    作者:盖国强 有这么一个Bug,仅在AIX平台上,Oracle Database 11.2.0.4的版本中出现,在12.1中被修复,之前和之后都不存在,所以简直是为这一版本定制的. 之前一些客户零星的 ...

  4. 【Android 逆向】frida 框架安装 ( 设置 Python 3.7 版本 | 安装 frida 12.7.5 版本 | 安装 frida-tools 5.1.0 版本 )

    文章目录 前言 一.设置 Python 3.73.73.7 版本 二.安装 frida 12.7.512.7.512.7.5 版本 三.安装 frida-tools 5.1.05.1.05.1.0 版 ...

  5. mysql8.0.12url_使用最新版本MySQL8.0.12报错记录

    使用最新版本MySQL报错,MySQL版本8.0.12. 报错1: Sun Oct 14 00:45:30 CST 2018 WARN: Establishing SSL connection wit ...

  6. lombok导入报错,版本1.18.12已在maven本地仓库中

    项目场景: 跟着网上一位博主的教程搭建一个基于SSM的图书管理系统 SSM整合(图书管理系统) 问题描述: 在复制到Books.java类时,idea显示报错(我这里已修复,所以这里的导入语句没有报错 ...

  7. postgresql立式版本下载_PostgreSQL 12.0 正式版本发布

    2019-10-03,PostgreSQL全球开发组今天宣布,世界上功能最为强大的开源数据库发布PostgreSQL 12版本发布. PostgreSQL 12版在各方面都得到了加强,包括显著地提升查 ...

  8. 如何查询spark版本_掌握Spark SQL中的查询执行

    了解您的查询计划 自从Spark 2.x以来,由于SQL和声明性DataFrame API,在Spark中查询数据已成为一种奢侈. 仅使用几行高级代码就可以表达非常复杂的逻辑并执行复杂的转换. API ...

  9. dep指定版本 go_Go 1.12 版本的新特性

    Go 1.12 昨天,Go 官方发布 1.12 版本.本文介绍下 Go 1.12 版本变更的内容. Go 1.12 正式版发布了,距离上个正式发布版 Go 1.11 已经过去半年.跟往常一样,Go 1 ...

最新文章

  1. 【Sprint3冲刺之前】TD学生助手测试用例
  2. 【论文解读】KDD2020最佳论文: 关于个性化排序任务评价指标的大讨论
  3. SmartRF Flash Programmer1.6.2打不开程序界面问题
  4. springMVC参数的传递方式(1.通过@PathVariabl获取路径参数,2.@ModelAttribute获取数据,3.HttpServletRequest取参,4@RequestParam)
  5. Failed to read auto-increment value from storageengine错误的处理方法
  6. 如何做到自动化运营--数据驱动
  7. ACM-ICPC 2019 山东省省赛 A Calandar
  8. 牛客练习赛71 F 红蓝图(kruskal重构树)
  9. java规则表达式_Java基础--正则表达式的规则
  10. java微博开发_【新手入门篇】新浪微博应用开发之Java入门篇
  11. @ApiImplicitParam注解的dataType、paramType两个属性的区别?
  12. css如何实现div背景透明
  13. 2018百战程序员大数据全套教程
  14. 谷歌地图开放俄军事设施高分辨率卫星图
  15. F-Droid换源的坑
  16. 人人都该懂点儿TCP
  17. 【晶体管电路设计】一、晶体管设计概述与共射极放大电路
  18. 我是一名自由职业白帽黑客
  19. CVTE嵌入式应用开发长期实习生线上笔试+线上面试(一面)
  20. 区块链笔记 - 1、区块链的来龙去脉

热门文章

  1. 甲骨文们是怎么被干掉的
  2. 机器学习笔记十:各种熵总结
  3. SAP WM 有无保存WM Level历史库存的Table?
  4. 【十大经典数据挖掘算法】k-means
  5. 费马大定理:三百年数学圣杯的角逐
  6. MIT发布2018年全球10大突破性技术!
  7. 【重磅】吴恩达宣布 Drive.ai 自动驾驶汽车服务落地 理想就这样成了现实!
  8. IDC:2018年中国制造业十大预测
  9. 福利内卷时代来临!腾讯为 3300 名员工发 11 亿红包
  10. 王者又连跪了?快让 AI 帮你上分!