Streaming源码解读之接收流数据的全生命周期
2019独角兽企业重金招聘Python工程师标准>>>
上一课我们讲解了Receiver启动的流程。Receiver是通过ReceiverSupervisor的start方法启动的:
/** Start the supervisor */
def start() {onStart()startReceiver()
}
首先会调用ReceiverSupervisor的onStart()方法,
override protected def onStart() {registeredBlockGenerators.foreach { _.start() }
}
而registeredBlockGenerators是在ReceiverSupervisor实例化时被赋值的:
private val defaultBlockGenerator = createBlockGenerator(defaultBlockGeneratorListener)override def createBlockGenerator(blockGeneratorListener: BlockGeneratorListener): BlockGenerator = {// Cleanup BlockGenerators that have already been stoppedregisteredBlockGenerators --= registeredBlockGenerators.filter{ _.isStopped() }val newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)registeredBlockGenerators += newBlockGeneratornewBlockGenerator
}
调用BlockGenerator的start方法
/** Start block generating and pushing threads. */
def start(): Unit = synchronized {if (state == Initialized) {state = ActiveblockIntervalTimer.start()blockPushingThread.start()logInfo("Started BlockGenerator")} else {throw new SparkException(s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]")}
}
blockIntervalTimer是一个定时器,到时间了就调用updateCurrentBuffer函数
private val blockIntervalTimer =new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
时间间隔默认200毫秒
private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
blockPushingThread是一个线程,它不断地将数据写入到BlockManager中
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }/** Keep pushing blocks to the BlockManager. */
private def keepPushingBlocks() {logInfo("Started block pushing thread")def areBlocksBeingGenerated: Boolean = synchronized {state != StoppedGeneratingBlocks}try {// While blocks are being generated, keep polling for to-be-pushed blocks and push them.while (areBlocksBeingGenerated) {Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match {case Some(block) => pushBlock(block)case None =>}}// At this point, state is StoppedGeneratingBlock. So drain the queue of to-be-pushed blocks.logInfo("Pushing out the last " + blocksForPushing.size() + " blocks")while (!blocksForPushing.isEmpty) {val block = blocksForPushing.take()logDebug(s"Pushing block $block")pushBlock(block)logInfo("Blocks left to push " + blocksForPushing.size())}logInfo("Stopped block pushing thread")} catch {case ie: InterruptedException =>logInfo("Block pushing thread was interrupted")case e: Exception =>reportError("Error in block pushing thread", e)}
}
从代码中可看出,每个10ms从blocksForPushing队列中取出所有的Block,调用pushBlock方法
private def pushBlock(block: Block) {listener.onPushBlock(block.id, block.buffer)logInfo("Pushed block " + block.id)
}
这里的listener是ReceiverSupervisorImpl中的
private val defaultBlockGeneratorListener = new BlockGeneratorListener {def onAddData(data: Any, metadata: Any): Unit = { }def onGenerateBlock(blockId: StreamBlockId): Unit = { }def onError(message: String, throwable: Throwable) {reportError(message, throwable)}def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {pushArrayBuffer(arrayBuffer, None, Some(blockId))}
}
所以会去调用pushArrayBuffer方法,最终会调用如下方法:
/** Store block and report it to driver */
def pushAndReportBlock(receivedBlock: ReceivedBlock,metadataOption: Option[Any],blockIdOption: Option[StreamBlockId]) {val blockId = blockIdOption.getOrElse(nextBlockId)val time = System.currentTimeMillisval blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")val numRecords = blockStoreResult.numRecordsval blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))logDebug(s"Reported block $blockId")
}
该方法,将数据交给receiverBlockHandler存储,并且会将元数据汇报给ReceiverTracker。
receiverBlockHandler有两种实现方式:
private val receivedBlockHandler: ReceivedBlockHandler = {if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {if (checkpointDirOption.isEmpty) {throw new SparkException("Cannot enable receiver write-ahead log without checkpoint directory set. " +"Please use streamingContext.checkpoint() to set the checkpoint directory. " +"See documentation for more details.")}new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)} else {new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)}
}
数据最终都会交给BlockManager。
blocksForPushing的定义如下:
private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)
private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
blocksForPushing的数据是由blockIntervalTimer定时器定期的将BlockGenerator的currentBuffer中的数据写入的。
/** Change the buffer to which single records are added to. */
private def updateCurrentBuffer(time: Long): Unit = {try {var newBlock: Block = nullsynchronized {if (currentBuffer.nonEmpty) {val newBlockBuffer = currentBuffercurrentBuffer = new ArrayBuffer[Any]val blockId = StreamBlockId(receiverId, time - blockIntervalMs)listener.onGenerateBlock(blockId)newBlock = new Block(blockId, newBlockBuffer)}}if (newBlock != null) {blocksForPushing.put(newBlock) // put is blocking when queue is full}} catch {case ie: InterruptedException =>logInfo("Block updating timer thread was interrupted")case e: Exception =>reportError("Error in block updating thread", e)}
}
我们再回过头来看看supervisor的startReceiver()方法:
/** Start receiver */
def startReceiver(): Unit = synchronized {try {if (onReceiverStart()) {logInfo("Starting receiver")receiverState = Startedreceiver.onStart()logInfo("Called receiver onStart")} else {// The driver refused usstop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)}} catch {case NonFatal(t) =>stop("Error starting receiver " + streamId, Some(t))}
}
会调用receiver的onStart方法,我们以SocketReceiver为例:
def onStart() {// Start the thread that receives data over a connectionnew Thread("Socket Receiver") {setDaemon(true)override def run() { receive() }}.start()
}
在该函数中,生成一个新的线程“Socket Receiver”,线程启动调用SocketReceiver的receive()方法
def receive() {var socket: Socket = nulltry {logInfo("Connecting to " + host + ":" + port)socket = new Socket(host, port)logInfo("Connected to " + host + ":" + port)val iterator = bytesToObjects(socket.getInputStream())while(!isStopped && iterator.hasNext) {store(iterator.next)}if (!isStopped()) {restart("Socket data stream had no more data")} else {logInfo("Stopped receiving")}} catch {case e: java.net.ConnectException =>restart("Error connecting to " + host + ":" + port, e)case NonFatal(e) =>logWarning("Error receiving data", e)restart("Error receiving data", e)} finally {if (socket != null) {socket.close()logInfo("Closed socket to " + host + ":" + port)}}}
}
构建了一个Socket对象,并且不断地从InputStream中接收数据,每接收一条调用一次store方法。
def store(dataItem: T) {supervisor.pushSingle(dataItem)
}
数据是由ReceiverSupervisor管理的,调用supervisor.pushSingle将数据写入。
def pushSingle(data: Any) {defaultBlockGenerator.addData(data)
}
defaultBlockGenerator的定义如下
private val defaultBlockGenerator = createBlockGenerator(defaultBlockGeneratorListener)override def createBlockGenerator(blockGeneratorListener: BlockGeneratorListener): BlockGenerator = {// Cleanup BlockGenerators that have already been stoppedregisteredBlockGenerators --= registeredBlockGenerators.filter{ _.isStopped() }val newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)registeredBlockGenerators += newBlockGeneratornewBlockGenerator
}
它就是一个BlockGenerator,而addData函数将数据保存在BlockGenerator中的currentBuffer对象中
/*** Push a single data item into the buffer.*/
def addData(data: Any): Unit = {if (state == Active) {waitToPush()synchronized {if (state == Active) {currentBuffer += data} else {throw new SparkException("Cannot add data as BlockGenerator has not been started or has been stopped")}}} else {throw new SparkException("Cannot add data as BlockGenerator has not been started or has been stopped")}
}
数据源源不断的流进来,每个200ms就会将currentBuffer中的数据写到blocksForPushing队列中,然后在重新实例化一个currentBuffer。而blocksForPushing队列会每个10ms就写入到BlockManager中。
转载于:https://my.oschina.net/corleone/blog/679230
Streaming源码解读之接收流数据的全生命周期相关推荐
- 第10课:Spark Streaming源码解读之流数据不断接收全生命周期彻底研究和思考
特别说明: 在上一遍文章中有详细的叙述Receiver启动的过程,如果不清楚的朋友,请您查看上一篇博客,这里我们就基于上篇的结论,继续往下说. 博文的目标是: Spark Streaming在接收 ...
- 第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密
第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密 /* 王家林老师授课http://weibo.com/ilovepai ...
- 号脉数据中心全生命周期,业务永续从细节做起
看什么看,快点蓝字关注我! 阿里巴巴将数据中心建到千岛湖旁边,腾讯将数据中心深藏在山洞中,如今的数据中心除了不能上天,这下水入地似乎无所不能.这虽然是句玩笑话,但也反映出随着规模化.集约化.绿色化等理 ...
- Spark 定制版:015~Spark Streaming源码解读之No Receivers彻底思考
本讲内容: a. Direct Acess b. Kafka 注:本讲内容基于Spark 1.6.1版本(在2016年5月来说是Spark最新版本)讲解. 上节回顾 上一讲中,我们讲Spark Str ...
- Vue源码解读一:Vue数据响应式原理
这方面的文章很多,但是我感觉很多写的比较抽象,本文会通过举例更详细的解释.(此文面向的Vue新手们,如果你是个大牛,看到这篇文章就可以点个赞,关掉页面了.)通过阅读这篇文章,你将了解到: 1.Vue数 ...
- Spark Streaming源码解读之Driver中ReceiverTracker架构设计以具体实现彻底研究
本期内容 : ReceiverTracker的架构设计 消息循环系统 ReceiverTracker具体实现 一. ReceiverTracker的架构设计 1. ReceiverTracker可以以 ...
- Spark Streaming源码解读之No Receivers彻底思考
本期内容 : Direct Acess Kafka Spark Streaming接收数据现在支持的两种方式: 01. Receiver的方式来接收数据,及输入数据的控制 02. No Receive ...
- 【YOLO-V3-SPP 源码解读】三、数据载入(数据增强)
以下的全部内容都是yolov3_spp源码的数据载入(数据增强)部分 下面的所有的内容都是按照代码执行的顺序进行讲解的 自定义数据集 继承自Dataset 所以要重写__len()__,__getit ...
- 第15课:Spark Streaming源码解读之No Receivers彻底思考
通常我们使用kafka direct的方式使用的是没有自定offset的构造函数 val kc = new KafkaCluster(kafkaParams) 完完全全就是kafka的操作了 我们看看 ...
最新文章
- 雷林鹏分享:PHP 简介
- aDev第13期#个性化推荐技术#总结(Part III, Final: 稳国柱@豆瓣)
- python脚本之家 包的创建和调用_python基础之包的导入和__init__.py的介绍
- jvm 安装位置_简单了解JVM
- 再问数据中台 - 数据中台建设的最大的挑战是什么
- php 接口升级,PHP 开发 APP 接口 学习笔记与总结 - APP 接口实例 [6] 版本升级接口开发...
- 从腾讯入职到离职,我仅用了三周:做大数据的同事看不起做报表的
- Python学习路程-常用设计模式学习
- python django restful_利用Django实现RESTful API(一)
- “离职同事在工作群抢红包被踢”:学会退群,是职场人的基本修养
- python 判断字符串中的的起始、终止子字符串
- LintCode_新手必编程50题(1-3阶段)解答与分析
- This computer doesn’t have VT-X/AMD-v enabled. Enabling it in the BIOS is mandatory“!
- 磁盘主分区转换为逻辑分区
- windows下缩短cmd路径的方式
- 算法导论——A*算法易懂的证明
- CW1233\CW1243\CW1053\CW1056\赛威一级代理\锂电池保护IC\中文规格书
- 云桌面eyeOS之现状
- 2022年湖北省乡村振兴科技创新示范基地申报条件以及流程汇总
- 2020.08.23|开始新的征程了呀