Spark版本定制第12天:Executor容错安全性
本期内容:
1 Executor WAL
2 消息重放
3 其他
一切不能进行实时流处理的数据都是无效的数据。在流处理时代,SparkStreaming有着强大吸引力,而且发展前景广阔,加之Spark的生态系统,Streaming可以方便调用其他的诸如SQL,MLlib等强大框架,它必将一统天下。
Spark Streaming运行时与其说是Spark Core上的一个流式处理框架,不如说是Spark Core上的一个最复杂的应用程序。如果可以掌握Spark streaming这个复杂的应用程序,那么其他的再复杂的应用程序都不在话下了。这里选择Spark Streaming作为版本定制的切入点也是大势所趋。
Executor 的数据安全容错非常重要(计算容错主要借助Spark Core的容错机制容错,天然就是容错的),这里的容错是数据的安全容错。
这里一共有俩种方式来写数据,一种是WriteAheadLogBasedBlockHandler,一种是BlockManagerBasedBlockHandler.这俩种方式保证了数据可以回放。并且在这个过程中,如果没有指定checkpoint的目录的话,会抛出异常。
在这里我们着重研究WAL的方式,也就是WriteAheadLogBasedBlockHandler
private[streaming] object WriteAheadLogBasedBlockHandler{def checkpointDirToLogDir(checkpointDir:String, streamId: Int): String = {new Path(checkpointDir,new Path("receivedData", streamId.toString)).toString} }
这里checkpointDirToLogDir创建了一个保存数据的路径。
def createLogForReceiver(sparkConf: SparkConf,fileWalLogDirectory: String,fileWalHadoopConf: Configuration): WriteAheadLog = {createLog(false, sparkConf, fileWalLogDirectory, fileWalHadoopConf) }
该方法的实质的调用了createLog方法
private def createLog(isDriver: Boolean,sparkConf: SparkConf,fileWalLogDirectory: String,fileWalHadoopConf: Configuration): WriteAheadLog = {val classNameOption= if (isDriver) {sparkConf.getOption(DRIVER_WAL_CLASS_CONF_KEY)} else {sparkConf.getOption(RECEIVER_WAL_CLASS_CONF_KEY)}val wal =classNameOption.map { className =>try {instantiateClass(Utils.classForName(className).asInstanceOf[Class[_ <: WriteAheadLog]], sparkConf)} catch {case NonFatal(e) =>throw new SparkException(s"Couldnot create a write ahead log of class $className", e)}}.getOrElse {new FileBasedWriteAheadLog(sparkConf,fileWalLogDirectory, fileWalHadoopConf,getRollingIntervalSecs(sparkConf,isDriver), getMaxFailures(sparkConf, isDriver),shouldCloseFileAfterWrite(sparkConf,isDriver))}if (isBatchingEnabled(sparkConf,isDriver)) {new BatchedWriteAheadLog(wal,sparkConf)} else {wal} }
这里默认创建的是FileBasedWriteAheadLog。
再回到storeBlock中,这里有storeInBlockManagerFuture和storeInWriteAheadLogFuture两个方法。所以数据存入BlockManager和WAl同时进行。完成之后,就可以交给trackEndpoint消息循环体了
def storeBlock(blockId: StreamBlockId, block:ReceivedBlock): ReceivedBlockStoreResult = {var numRecords= None: Option[Long]// Serialize the block so that it can be inserted intobothval serializedBlock= block match {case ArrayBufferBlock(arrayBuffer)=>numRecords = Some(arrayBuffer.size.toLong)blockManager.dataSerialize(blockId,arrayBuffer.iterator)case IteratorBlock(iterator)=>val countIterator= new CountingIterator(iterator)val serializedBlock= blockManager.dataSerialize(blockId, countIterator)numRecords = countIterator.countserializedBlockcase ByteBufferBlock(byteBuffer)=>byteBuffercase _=>throw new Exception(s"Could notpush $blockId to block manager, unexpected block type")}// Store the block in block managerval storeInBlockManagerFuture= Future {val putResult=blockManager.putBytes(blockId,serializedBlock, effectiveStorageLevel, tellMaster = true)if (!putResult.map{ _._1 }.contains(blockId)) {throw new SparkException(s"Could not store $blockId to block manager with storage level $storageLevel")}}// Store the block in write ahead logval storeInWriteAheadLogFuture= Future {writeAheadLog.write(serializedBlock, clock.getTimeMillis())}// Combine the futures, wait for both to complete, andreturn the write ahead log record handleval combinedFuture= storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2)val walRecordHandle= Await.result(combinedFuture, blockStoreTimeout)WriteAheadLogBasedStoreResult(blockId,numRecords, walRecordHandle) }
备注:
资料来源于:DT_大数据梦工厂(Spark发行版本定制)
更多私密内容,请关注微信公众号:DT_Spark
如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580
转载于:https://www.cnblogs.com/pzwxySpark/p/Spark12.html
Spark版本定制第12天:Executor容错安全性相关推荐
- spark版本bug总结
spark报错总结 spark sql 1.spark sql 删除分区报错 mismatched input '<=' expecting 2.spark sql 读取hive parquet ...
- 查看spark版本linux,如何查看spark版本和scala版本
如何查看spark版本和scala版本 发布时间:2018-11-21 05:48, 浏览次数:2445 , 标签: spark scala 1.进入命令行状态 windows 电脑 方法一: 在系统 ...
- oracle 11.2.0.4 bug,警示:一个专为AIX上oracle11.2.0.4版本定制的Bug正在高发
作者:盖国强 有这么一个Bug,仅在AIX平台上,Oracle Database 11.2.0.4的版本中出现,在12.1中被修复,之前和之后都不存在,所以简直是为这一版本定制的. 之前一些客户零星的 ...
- 【Android 逆向】frida 框架安装 ( 设置 Python 3.7 版本 | 安装 frida 12.7.5 版本 | 安装 frida-tools 5.1.0 版本 )
文章目录 前言 一.设置 Python 3.73.73.7 版本 二.安装 frida 12.7.512.7.512.7.5 版本 三.安装 frida-tools 5.1.05.1.05.1.0 版 ...
- mysql8.0.12url_使用最新版本MySQL8.0.12报错记录
使用最新版本MySQL报错,MySQL版本8.0.12. 报错1: Sun Oct 14 00:45:30 CST 2018 WARN: Establishing SSL connection wit ...
- lombok导入报错,版本1.18.12已在maven本地仓库中
项目场景: 跟着网上一位博主的教程搭建一个基于SSM的图书管理系统 SSM整合(图书管理系统) 问题描述: 在复制到Books.java类时,idea显示报错(我这里已修复,所以这里的导入语句没有报错 ...
- postgresql立式版本下载_PostgreSQL 12.0 正式版本发布
2019-10-03,PostgreSQL全球开发组今天宣布,世界上功能最为强大的开源数据库发布PostgreSQL 12版本发布. PostgreSQL 12版在各方面都得到了加强,包括显著地提升查 ...
- 如何查询spark版本_掌握Spark SQL中的查询执行
了解您的查询计划 自从Spark 2.x以来,由于SQL和声明性DataFrame API,在Spark中查询数据已成为一种奢侈. 仅使用几行高级代码就可以表达非常复杂的逻辑并执行复杂的转换. API ...
- dep指定版本 go_Go 1.12 版本的新特性
Go 1.12 昨天,Go 官方发布 1.12 版本.本文介绍下 Go 1.12 版本变更的内容. Go 1.12 正式版发布了,距离上个正式发布版 Go 1.11 已经过去半年.跟往常一样,Go 1 ...
最新文章
- 【Sprint3冲刺之前】TD学生助手测试用例
- 【论文解读】KDD2020最佳论文: 关于个性化排序任务评价指标的大讨论
- SmartRF Flash Programmer1.6.2打不开程序界面问题
- springMVC参数的传递方式(1.通过@PathVariabl获取路径参数,2.@ModelAttribute获取数据,3.HttpServletRequest取参,4@RequestParam)
- Failed to read auto-increment value from storageengine错误的处理方法
- 如何做到自动化运营--数据驱动
- ACM-ICPC 2019 山东省省赛 A Calandar
- 牛客练习赛71 F 红蓝图(kruskal重构树)
- java规则表达式_Java基础--正则表达式的规则
- java微博开发_【新手入门篇】新浪微博应用开发之Java入门篇
- @ApiImplicitParam注解的dataType、paramType两个属性的区别?
- css如何实现div背景透明
- 2018百战程序员大数据全套教程
- 谷歌地图开放俄军事设施高分辨率卫星图
- F-Droid换源的坑
- 人人都该懂点儿TCP
- 【晶体管电路设计】一、晶体管设计概述与共射极放大电路
- 我是一名自由职业白帽黑客
- CVTE嵌入式应用开发长期实习生线上笔试+线上面试(一面)
- 区块链笔记 - 1、区块链的来龙去脉