2019独角兽企业重金招聘Python工程师标准>>>

上一课我们讲解了Receiver启动的流程。Receiver是通过ReceiverSupervisor的start方法启动的:

/** Start the supervisor */
def start() {onStart()startReceiver()
}

首先会调用ReceiverSupervisor的onStart()方法,

override protected def onStart() {registeredBlockGenerators.foreach { _.start() }
}

而registeredBlockGenerators是在ReceiverSupervisor实例化时被赋值的:

private val defaultBlockGenerator = createBlockGenerator(defaultBlockGeneratorListener)override def createBlockGenerator(blockGeneratorListener: BlockGeneratorListener): BlockGenerator = {// Cleanup BlockGenerators that have already been stoppedregisteredBlockGenerators --= registeredBlockGenerators.filter{ _.isStopped() }val newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)registeredBlockGenerators += newBlockGeneratornewBlockGenerator
}

调用BlockGenerator的start方法

/** Start block generating and pushing threads. */
def start(): Unit = synchronized {if (state == Initialized) {state = ActiveblockIntervalTimer.start()blockPushingThread.start()logInfo("Started BlockGenerator")} else {throw new SparkException(s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]")}
}

blockIntervalTimer是一个定时器,到时间了就调用updateCurrentBuffer函数

private val blockIntervalTimer =new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")

时间间隔默认200毫秒

private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")

blockPushingThread是一个线程,它不断地将数据写入到BlockManager中

private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }/** Keep pushing blocks to the BlockManager. */
private def keepPushingBlocks() {logInfo("Started block pushing thread")def areBlocksBeingGenerated: Boolean = synchronized {state != StoppedGeneratingBlocks}try {// While blocks are being generated, keep polling for to-be-pushed blocks and push them.while (areBlocksBeingGenerated) {Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match {case Some(block) => pushBlock(block)case None =>}}// At this point, state is StoppedGeneratingBlock. So drain the queue of to-be-pushed blocks.logInfo("Pushing out the last " + blocksForPushing.size() + " blocks")while (!blocksForPushing.isEmpty) {val block = blocksForPushing.take()logDebug(s"Pushing block $block")pushBlock(block)logInfo("Blocks left to push " + blocksForPushing.size())}logInfo("Stopped block pushing thread")} catch {case ie: InterruptedException =>logInfo("Block pushing thread was interrupted")case e: Exception =>reportError("Error in block pushing thread", e)}
}

从代码中可看出,每个10ms从blocksForPushing队列中取出所有的Block,调用pushBlock方法

private def pushBlock(block: Block) {listener.onPushBlock(block.id, block.buffer)logInfo("Pushed block " + block.id)
}

这里的listener是ReceiverSupervisorImpl中的

private val defaultBlockGeneratorListener = new BlockGeneratorListener {def onAddData(data: Any, metadata: Any): Unit = { }def onGenerateBlock(blockId: StreamBlockId): Unit = { }def onError(message: String, throwable: Throwable) {reportError(message, throwable)}def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {pushArrayBuffer(arrayBuffer, None, Some(blockId))}
}

所以会去调用pushArrayBuffer方法,最终会调用如下方法:

/** Store block and report it to driver */
def pushAndReportBlock(receivedBlock: ReceivedBlock,metadataOption: Option[Any],blockIdOption: Option[StreamBlockId]) {val blockId = blockIdOption.getOrElse(nextBlockId)val time = System.currentTimeMillisval blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")val numRecords = blockStoreResult.numRecordsval blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))logDebug(s"Reported block $blockId")
}

该方法,将数据交给receiverBlockHandler存储,并且会将元数据汇报给ReceiverTracker。

receiverBlockHandler有两种实现方式:

private val receivedBlockHandler: ReceivedBlockHandler = {if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {if (checkpointDirOption.isEmpty) {throw new SparkException("Cannot enable receiver write-ahead log without checkpoint directory set. " +"Please use streamingContext.checkpoint() to set the checkpoint directory. " +"See documentation for more details.")}new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)} else {new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)}
}

数据最终都会交给BlockManager。

blocksForPushing的定义如下:

private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)
private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)

blocksForPushing的数据是由blockIntervalTimer定时器定期的将BlockGenerator的currentBuffer中的数据写入的。

/** Change the buffer to which single records are added to. */
private def updateCurrentBuffer(time: Long): Unit = {try {var newBlock: Block = nullsynchronized {if (currentBuffer.nonEmpty) {val newBlockBuffer = currentBuffercurrentBuffer = new ArrayBuffer[Any]val blockId = StreamBlockId(receiverId, time - blockIntervalMs)listener.onGenerateBlock(blockId)newBlock = new Block(blockId, newBlockBuffer)}}if (newBlock != null) {blocksForPushing.put(newBlock)  // put is blocking when queue is full}} catch {case ie: InterruptedException =>logInfo("Block updating timer thread was interrupted")case e: Exception =>reportError("Error in block updating thread", e)}
}

我们再回过头来看看supervisor的startReceiver()方法:

/** Start receiver */
def startReceiver(): Unit = synchronized {try {if (onReceiverStart()) {logInfo("Starting receiver")receiverState = Startedreceiver.onStart()logInfo("Called receiver onStart")} else {// The driver refused usstop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)}} catch {case NonFatal(t) =>stop("Error starting receiver " + streamId, Some(t))}
}

会调用receiver的onStart方法,我们以SocketReceiver为例:

def onStart() {// Start the thread that receives data over a connectionnew Thread("Socket Receiver") {setDaemon(true)override def run() { receive() }}.start()
}

在该函数中,生成一个新的线程“Socket Receiver”,线程启动调用SocketReceiver的receive()方法

def receive() {var socket: Socket = nulltry {logInfo("Connecting to " + host + ":" + port)socket = new Socket(host, port)logInfo("Connected to " + host + ":" + port)val iterator = bytesToObjects(socket.getInputStream())while(!isStopped && iterator.hasNext) {store(iterator.next)}if (!isStopped()) {restart("Socket data stream had no more data")} else {logInfo("Stopped receiving")}} catch {case e: java.net.ConnectException =>restart("Error connecting to " + host + ":" + port, e)case NonFatal(e) =>logWarning("Error receiving data", e)restart("Error receiving data", e)} finally {if (socket != null) {socket.close()logInfo("Closed socket to " + host + ":" + port)}}}
}

构建了一个Socket对象,并且不断地从InputStream中接收数据,每接收一条调用一次store方法。

def store(dataItem: T) {supervisor.pushSingle(dataItem)
}

数据是由ReceiverSupervisor管理的,调用supervisor.pushSingle将数据写入。

def pushSingle(data: Any) {defaultBlockGenerator.addData(data)
}

defaultBlockGenerator的定义如下

private val defaultBlockGenerator = createBlockGenerator(defaultBlockGeneratorListener)override def createBlockGenerator(blockGeneratorListener: BlockGeneratorListener): BlockGenerator = {// Cleanup BlockGenerators that have already been stoppedregisteredBlockGenerators --= registeredBlockGenerators.filter{ _.isStopped() }val newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)registeredBlockGenerators += newBlockGeneratornewBlockGenerator
}

它就是一个BlockGenerator,而addData函数将数据保存在BlockGenerator中的currentBuffer对象中

/*** Push a single data item into the buffer.*/
def addData(data: Any): Unit = {if (state == Active) {waitToPush()synchronized {if (state == Active) {currentBuffer += data} else {throw new SparkException("Cannot add data as BlockGenerator has not been started or has been stopped")}}} else {throw new SparkException("Cannot add data as BlockGenerator has not been started or has been stopped")}
}

数据源源不断的流进来,每个200ms就会将currentBuffer中的数据写到blocksForPushing队列中,然后在重新实例化一个currentBuffer。而blocksForPushing队列会每个10ms就写入到BlockManager中。

转载于:https://my.oschina.net/corleone/blog/679230

Streaming源码解读之接收流数据的全生命周期相关推荐

  1. 第10课:Spark Streaming源码解读之流数据不断接收全生命周期彻底研究和思考

    特别说明:  在上一遍文章中有详细的叙述Receiver启动的过程,如果不清楚的朋友,请您查看上一篇博客,这里我们就基于上篇的结论,继续往下说. 博文的目标是:  Spark Streaming在接收 ...

  2. 第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密

    第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密 /* 王家林老师授课http://weibo.com/ilovepai ...

  3. 号脉数据中心全生命周期,业务永续从细节做起

    看什么看,快点蓝字关注我! 阿里巴巴将数据中心建到千岛湖旁边,腾讯将数据中心深藏在山洞中,如今的数据中心除了不能上天,这下水入地似乎无所不能.这虽然是句玩笑话,但也反映出随着规模化.集约化.绿色化等理 ...

  4. Spark 定制版:015~Spark Streaming源码解读之No Receivers彻底思考

    本讲内容: a. Direct Acess b. Kafka 注:本讲内容基于Spark 1.6.1版本(在2016年5月来说是Spark最新版本)讲解. 上节回顾 上一讲中,我们讲Spark Str ...

  5. Vue源码解读一:Vue数据响应式原理

    这方面的文章很多,但是我感觉很多写的比较抽象,本文会通过举例更详细的解释.(此文面向的Vue新手们,如果你是个大牛,看到这篇文章就可以点个赞,关掉页面了.)通过阅读这篇文章,你将了解到: 1.Vue数 ...

  6. Spark Streaming源码解读之Driver中ReceiverTracker架构设计以具体实现彻底研究

    本期内容 : ReceiverTracker的架构设计 消息循环系统 ReceiverTracker具体实现 一. ReceiverTracker的架构设计 1. ReceiverTracker可以以 ...

  7. Spark Streaming源码解读之No Receivers彻底思考

    本期内容 : Direct Acess Kafka Spark Streaming接收数据现在支持的两种方式: 01. Receiver的方式来接收数据,及输入数据的控制 02. No Receive ...

  8. 【YOLO-V3-SPP 源码解读】三、数据载入(数据增强)

    以下的全部内容都是yolov3_spp源码的数据载入(数据增强)部分 下面的所有的内容都是按照代码执行的顺序进行讲解的 自定义数据集 继承自Dataset 所以要重写__len()__,__getit ...

  9. 第15课:Spark Streaming源码解读之No Receivers彻底思考

    通常我们使用kafka direct的方式使用的是没有自定offset的构造函数 val kc = new KafkaCluster(kafkaParams) 完完全全就是kafka的操作了 我们看看 ...

最新文章

  1. 雷林鹏分享:PHP 简介
  2. aDev第13期#个性化推荐技术#总结(Part III, Final: 稳国柱@豆瓣)
  3. python脚本之家 包的创建和调用_python基础之包的导入和__init__.py的介绍
  4. jvm 安装位置_简单了解JVM
  5. 再问数据中台 - 数据中台建设的最大的挑战是什么
  6. php 接口升级,PHP 开发 APP 接口 学习笔记与总结 - APP 接口实例 [6] 版本升级接口开发...
  7. 从腾讯入职到离职,我仅用了三周:做大数据的同事看不起做报表的
  8. Python学习路程-常用设计模式学习
  9. python django restful_利用Django实现RESTful API(一)
  10. “离职同事在工作群抢红包被踢”:学会退群,是职场人的基本修养
  11. python 判断字符串中的的起始、终止子字符串
  12. LintCode_新手必编程50题(1-3阶段)解答与分析
  13. This computer doesn’t have VT-X/AMD-v enabled. Enabling it in the BIOS is mandatory“!
  14. 磁盘主分区转换为逻辑分区
  15. windows下缩短cmd路径的方式
  16. 算法导论——A*算法易懂的证明
  17. CW1233\CW1243\CW1053\CW1056\赛威一级代理\锂电池保护IC\中文规格书
  18. 云桌面eyeOS之现状
  19. 2022年湖北省乡村振兴科技创新示范基地申报条件以及流程汇总
  20. 2020.08.23|开始新的征程了呀

热门文章

  1. 对象的引用和clone
  2. Entity Framwork 学习笔记 (一) 创建 School 数据库
  3. 一个弹出式menu的制作
  4. mysql 主从配置
  5. Linux下的man命令
  6. 2017年前端面试题整理汇总100题
  7. Android app开发捷径,让你少去踩坑
  8. Jackson学习笔记(三)转
  9. linux网络管理三剑客
  10. SmartFoxServer 2X 笔记一:login request (转)