特别说明: 
在上一遍文章中有详细的叙述Receiver启动的过程,如果不清楚的朋友,请您查看上一篇博客,这里我们就基于上篇的结论,继续往下说。

博文的目标是: 
Spark Streaming在接收数据的全生命周期贯通

组织思路如下: 
a) 接收数据的架构模式的设计 
b) 然后再具体源码分析

接收数据的架构模式的设计 
1. 当有Spark Streaming有application的时候Spark Streaming会持续不断的接收数据。 
2. 一般Receiver和Driver不在一个进程中的,所以接收到数据之后要不断的汇报给Driver。 
3. Spark Streaming要接收数据肯定要使用消息循环器,循环器不断的接收到数据之后,然后将数据存储起来,再将存储完的数据汇报给Driver。 
4. Spark Streaming数据接收的过程也是MVC的架构,M是model也就是Receiver. 
C是Control也就是存储级别的ReceiverSupervisor。V是界面。 
5. ReceiverSupervisor是控制器,Receiver的启动是靠ReceiverTracker启动的,Receiver接收到数据之后是靠ReceiverSupervisor存储数据的。然后Driver就获得元数据也就是界面,通过界面来操作底层的数据,这个元数据就相当于指针。 
Spark Streaming接收数据流程如下: 

具体源码分析 
1. ReceiverTracker通过发送Job的方式,并且每个Job只有一个Task,并且Task中只通过一个ReceiverSupervisor启动一个Receiver. 
2. 下图就是Receiver启动的流程图,现在就从ReceiverTracker的start开始今天的旅程。 
 
3. Start方法中创建Endpoint实例

/** Start the endpoint and receiver execution thread. */def start(): Unit = synchronized {  if (isTrackerStarted) {    throw new SparkException("ReceiverTracker already started")}  if (!receiverInputStreams.isEmpty) {endpoint = ssc.env.rpcEnv.setupEndpoint(      "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))    if (!skipReceiverLaunch) launchReceivers()logInfo("ReceiverTracker started")trackerState = Started}
}
4.  LaunchReceivers源码如下:
/*** Get the receivers from the ReceiverInputDStreams, distributes them to the* worker nodes as a parallel collection, and runs them.*/
private def launchReceivers(): Unit = {val receivers = receiverInputStreams.map(nis => {val rcvr = nis.getReceiver()rcvr.setReceiverId(nis.id)rcvr})runDummySparkJob()logInfo("Starting " + receivers.length + " receivers")
//此时的endpoint就是前面实例化的ReceiverTrackerEndpointendpoint.send(StartAllReceivers(receivers))
}
5.  从图上可以知道,send发送消息之后,ReceiverTrackerEndpoint的receive就接收到了消息。
override def receive: PartialFunction[Any, Unit] = {// Local messagescase StartAllReceivers(receivers) =>val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)    for (receiver <- receivers) {val executors = scheduledLocations(receiver.streamId)updateReceiverScheduledExecutors(receiver.streamId, executors)receiverPreferredLocations(receiver.streamId) = receiver.preferredLocationstartReceiver(receiver, executors)}
6.  startReceiver源码如下:
/*** Start a receiver along with its scheduled executors*/private def startReceiver(receiver: Receiver[_],scheduledLocations: Seq[TaskLocation]): Unit = {  def shouldStartReceiver: Boolean = {    // It's okay to start when trackerState is Initialized or Started!(isTrackerStopping || isTrackerStopped)}  val receiverId = receiver.streamId  if (!shouldStartReceiver) {onReceiverJobFinish(receiverId)    return}  val checkpointDirOption = Option(ssc.checkpointDir)  val serializableHadoopConf =    new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)// startReceiverFunc就是我们通过RDD启动Job的那个Func// Function to start the receiver on the worker node//此时虽然是iterator但是就是一个Receiver,因为你如果追溯一下前面StartReceiver被调用的时候是for循环遍历Receivers.val startReceiverFunc: Iterator[Receiver[_]] => Unit =(iterator: Iterator[Receiver[_]]) => {      if (!iterator.hasNext) {        throw new SparkException(          "Could not start receiver as object not found.")}      if (TaskContext.get().attemptNumber() == 0) {        val receiver = iterator.next()assert(iterator.hasNext == false)//此时的receiver是根据数据输入来源创建的InputDStream//例如socketInputDStream他有自己的receiver也就是SocketReceiver//此时receiver就相当于一个引用句柄。他只是引用的描述val supervisor = new ReceiverSupervisorImpl(receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)//当startReceiverFunc被调用的时候ReceiverSupervisorImpl的start方法就会运行。supervisor.start()supervisor.awaitTermination()} else {        // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.}}  // Create the RDD using the scheduledLocations to run the receiver in a Spark jobval receiverRDD: RDD[Receiver[_]] =    if (scheduledLocations.isEmpty) {//此时Seq(receiver)中只有一个Receiverssc.sc.makeRDD(Seq(receiver), 1)} else {      val preferredLocations = scheduledLocations.map(_.toString).distinctssc.sc.makeRDD(Seq(receiver -> preferredLocations))}//专门为了创建receiver而创建的RDDreceiverRDD.setName(s"Receiver $receiverId")ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))  val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())  // We will keep restarting the receiver job until ReceiverTracker is stoppedfuture.onComplete {    case Success(_) =>      if (!shouldStartReceiver) {onReceiverJobFinish(receiverId)} else {logInfo(s"Restarting Receiver $receiverId")self.send(RestartReceiver(receiver))}    case Failure(e) =>      if (!shouldStartReceiver) {onReceiverJobFinish(receiverId)} else {logError("Receiver has been stopped. Try to restart it.", e)logInfo(s"Restarting Receiver $receiverId")self.send(RestartReceiver(receiver))}}(submitJobThreadPool)logInfo(s"Receiver ${receiver.streamId} started")
}
6.  startReceiver源码如下:
/*** Start a receiver along with its scheduled executors*/private def startReceiver(receiver: Receiver[_],scheduledLocations: Seq[TaskLocation]): Unit = {  def shouldStartReceiver: Boolean = {    // It's okay to start when trackerState is Initialized or Started!(isTrackerStopping || isTrackerStopped)}  val receiverId = receiver.streamId  if (!shouldStartReceiver) {onReceiverJobFinish(receiverId)    return}  val checkpointDirOption = Option(ssc.checkpointDir)  val serializableHadoopConf =    new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)// startReceiverFunc就是我们通过RDD启动Job的那个Func// Function to start the receiver on the worker node//此时虽然是iterator但是就是一个Receiver,因为你如果追溯一下前面StartReceiver被调用的时候是for循环遍历Receivers.val startReceiverFunc: Iterator[Receiver[_]] => Unit =(iterator: Iterator[Receiver[_]]) => {      if (!iterator.hasNext) {        throw new SparkException(          "Could not start receiver as object not found.")}      if (TaskContext.get().attemptNumber() == 0) {        val receiver = iterator.next()assert(iterator.hasNext == false)//此时的receiver是根据数据输入来源创建的InputDStream//例如socketInputDStream他有自己的receiver也就是SocketReceiver//此时receiver就相当于一个引用句柄。他只是引用的描述val supervisor = new ReceiverSupervisorImpl(receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)//当startReceiverFunc被调用的时候ReceiverSupervisorImpl的start方法就会运行。supervisor.start()supervisor.awaitTermination()} else {        // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.}}  // Create the RDD using the scheduledLocations to run the receiver in a Spark jobval receiverRDD: RDD[Receiver[_]] =    if (scheduledLocations.isEmpty) {//此时Seq(receiver)中只有一个Receiverssc.sc.makeRDD(Seq(receiver), 1)} else {      val preferredLocations = scheduledLocations.map(_.toString).distinctssc.sc.makeRDD(Seq(receiver -> preferredLocations))}//专门为了创建receiver而创建的RDDreceiverRDD.setName(s"Receiver $receiverId")ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))  val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())  // We will keep restarting the receiver job until ReceiverTracker is stoppedfuture.onComplete {    case Success(_) =>      if (!shouldStartReceiver) {onReceiverJobFinish(receiverId)} else {logInfo(s"Restarting Receiver $receiverId")self.send(RestartReceiver(receiver))}    case Failure(e) =>      if (!shouldStartReceiver) {onReceiverJobFinish(receiverId)} else {logError("Receiver has been stopped. Try to restart it.", e)logInfo(s"Restarting Receiver $receiverId")self.send(RestartReceiver(receiver))}}(submitJobThreadPool)logInfo(s"Receiver ${receiver.streamId} started")
}
7.  现在就追踪一下receiver参数的传递过程。先找到startReceiver在哪里调用。
override def receive: PartialFunction[Any, Unit] = {// Local messagescase StartAllReceivers(receivers) =>val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)    for (receiver <- receivers) {val executors = scheduledLocations(receiver.streamId)updateReceiverScheduledExecutors(receiver.streamId, executors)receiverPreferredLocations(receiver.streamId) = receiver.preferredLocationstartReceiver(receiver, executors)}
8.  可以看出receiver是StartAllReceivers方法传入的,继续追踪StartAllReceivers。
通过getReceiver就获得了receiver的对象。
/*** Get the receivers from the ReceiverInputDStreams, distributes them to the* worker nodes as a parallel collection, and runs them.*/
private def launchReceivers(): Unit = {val receivers = receiverInputStreams.map(nis => {val rcvr = nis.getReceiver()rcvr.setReceiverId(nis.id)rcvr})runDummySparkJob()logInfo("Starting " + receivers.length + " receivers")endpoint.send(StartAllReceivers(receivers))
}

submitJob的时候就提交了作业,在具体的节点上运行Job,此时是通过ReceiverSupervisorImpl完成的。 
此时在ReceiverTracker的startReceiver调用的时候完成了两件事:

 
ReceiverTrackerImpl的初始化和start方法的调用。

第一步:ReceiverTrackerImpl的初始化 
1. ReceiverSupervisor负责接收receiver接收的数据,之后,ReceiverSupervisor会存储数据,然后汇报给Driver。Receiver是一条一条的接收数据(Kafka是 Key Value的形式)。

/*** Concrete implementation of `org`.`apache`.`spark`.`streaming`.`receiver`.`ReceiverSupervisor`* which provides all the necessary functionality for handling the data received by* the receiver. Specifically, it creates a `org`.`apache`.`spark`.`streaming`.`receiver`.`BlockGenerator`* object that is used to divide the received data stream into blocks of data.*/
private[streaming] class ReceiverSupervisorImpl(
2.  ReceiverSupervisorImpl初始化源码如下:
/** Remote RpcEndpointRef for the ReceiverTracker *///负责链接ReceiverTracker的消息通信体private val trackerEndpoint = RpcUtils.makeDriverRef("ReceiverTracker", env.conf, env.rpcEnv)/** RpcEndpointRef for receiving messages from the ReceiverTracker in the driver *///endpoint负责在Driver端接收ReceiverTracker发送来的消息。private val endpoint = env.rpcEnv.setupEndpoint(  "Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {    override val rpcEnv: RpcEnv = env.rpcEnv    override def receive: PartialFunction[Any, Unit] = {      case StopReceiver =>logInfo("Received stop signal")ReceiverSupervisorImpl.this.stop("Stopped by driver", None)//每个Batch处理完数据之后,Driver的ReceiverTracker会发消息给ReceiverTrackerImpl要求清理Block信息。case CleanupOldBlocks(threshTime) =>logDebug("Received delete old batch signal")cleanupOldBlocks(threshTime)//限制receiver接收数据的,也就是限流的。这样的话就可以动态的改变receiver//的数据接收速度。case UpdateRateLimit(eps) =>logInfo(s"Received a new rate limit: $eps.")registeredBlockGenerators.foreach { bg =>bg.updateRate(eps)}}})

cleanupOldBlocks源码如下:

  private def cleanupOldBlocks(cleanupThreshTime: Time): Unit = {logDebug(s"Cleaning up blocks older then $cleanupThreshTime")receivedBlockHandler.cleanupOldBlocks(cleanupThreshTime.milliseconds)}
}
3.  从对象中获得限流的速度,这对于实际生产环境非常重要,因为有时间数据请求量非常的多,整个集群又处理不完或者来不及处理,这个时候如果不限流的话,延迟就非常的高。
/*** Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by* {`spark`.`streaming`.`receiver`.`maxRate`}, even if `newRate` is higher than that.** @param newRate A new rate in events per second. It has no effect if it's 0 or negative.*/private[receiver] def updateRate(newRate: Long): Unit =    if (newRate > 0) {      if (maxRateLimit > 0) {rateLimiter.setRate(newRate.min(maxRateLimit))} else {rateLimiter.setRate(newRate)}}
}

至此上面就完成了ReceiverSupervisorImpl的初始化。这里只是简单的提了一些,后面还会详解 

第二步:ReceiverTrackerImpl的start方法被调用。 
在ReceiverTrackerImpl的函数中,并没有start方法,这个时候的实现是在其父类start方法中实现的。

4.  在supervisor启动的时候会调用ReceiverSupervisor的start方法
/** Start the supervisor */def start() {onStart()startReceiver()
}
5.  onstart方法: 此方法必须在receiver.onStart()之前被调用,来确保BlockGenerator被实例化和启动。Receiver在接收数据的时候是通过BlockGenerator转换成Block形式,因为Receiver一条一条的接收数据,需要将此数据合并成Block,RDD的处理单位是Block。
/*** Called when supervisor is started.* Note that this must be called before the receiver.onStart() is called to ensure* things like `BlockGenerator`s are started before the receiver starts sending data.*/protected def onStart() { }
6.  onStart方法具体实现是在RceiverSupervisorImpl方法中实现的。
override protected def onStart() {registeredBlockGenerators.foreach { _.start() }
}

什么是BlockGenerator? 
将接收到的数据以Batch的方式存在,并且以特定的频率存储。 
BlockGenerator会启动两条线程: 
1. 一条线程会周期性的把Receiver接收到的数据合并成Block。 
2. 另一条线程是把接收到的数据使用BlockManager存储。 
BlockGenerator继承自RateLimiter,由此可以看出无法限定流熟度,但是可以限定存储的速度,转过来限制流进来的速度。

/*** Generates batches of objects received by a* `org`.`apache`.`spark`.`streaming`.`receiver`.`Receiver` and puts them into appropriately* named blocks at regular intervals. This class starts two threads,* one to periodically start a new batch and prepare the previous batch of as a block,* the other to push the blocks into the block manager.** Note: Do not create BlockGenerator instances directly inside receivers. Use* `ReceiverSupervisor.createBlockGenerator` to create a BlockGenerator and use it.*/
private[streaming] class BlockGenerator(listener: BlockGeneratorListener,receiverId: Int,conf: SparkConf,clock: Clock = new SystemClock()) extends RateLimiter(conf) with Logging {

BlockGenerator是怎么产生的? 
7. 在ReceiverSupervisorImpl的createBlockGenerator方法中实现了BlockGenerator的创建。

override def createBlockGenerator(blockGeneratorListener: BlockGeneratorListener): BlockGenerator = {// Cleanup BlockGenerators that have already been stoppedregisteredBlockGenerators --= registeredBlockGenerators.filter{ _.isStopped() }
//一个streamId指服务于一个BlockGeneratorval newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)registeredBlockGenerators += newBlockGeneratornewBlockGenerator
}
8.  回到上面ReceiverTrackerImpl的onStart方法
override protected def onStart() {//启动BlockGenerator的定时器不断的把数据放在内存中的Buffer中然后将多条Buffer合并成Block,此时只是准备去接收Receiver的数据registeredBlockGenerators.foreach { _.start() }
}
9.  BlockGenerator的start方法启动了BlockGenerator的两条线程。
/** Start block generating and pushing threads. */def start(): Unit = synchronized {  if (state == Initialized) {state = ActiveblockIntervalTimer.start()blockPushingThread.start()logInfo("Started BlockGenerator")} else {    throw new SparkException(s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]")}
}
  1. blockIntervalTimer是RecurringTimer实例。

private val blockIntervalTimer =  new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
11. blockIntervalTimer的start方法。
/*** Start at the earliest time it can start based on the period.*/def start(): Long = {start(getStartTime())
}
12. 启动线程
/*** Start at the given start time.*/def start(startTime: Long): Long = synchronized {nextTime = startTimethread.start()logInfo("Started timer for " + name + " at time " + nextTime)nextTime
}
13. Tread启动loop.
private val thread = new Thread("RecurringTimer - " + name) {setDaemon(true)  override def run() { loop }
}
14. Loop也就会调用triggerActionForNextInterval()
  /*** Repeatedly call the callback every interval.*/private def loop() {    try {      while (!stopped) {triggerActionForNextInterval()}triggerActionForNextInterval()} catch {      case e: InterruptedException =>}}
}


15. 此时callback函数就会回调updateCurrentBuffer方法。

private def triggerActionForNextInterval(): Unit = {clock.waitTillTime(nextTime)callback(nextTime)prevTime = nextTimenextTime += periodlogDebug("Callback for " + name + " called at time " + prevTime)
}
16. 在RecurringTimer实例创建的时候,第三个参数传入的就是updateCurrentBuffer方法。
private[streaming]class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)extends Logging {
17. 把接收到的数据放入到Buffer缓存中,然后再把Buffer按照一定的大小合并成Block.
/** Change the buffer to which single records are added to. */private def updateCurrentBuffer(time: Long): Unit = {  try {    var newBlock: Block = nullsynchronized {      if (currentBuffer.nonEmpty) {        val newBlockBuffer = currentBuffer= new ArrayBuffer[Any]        val blockId = StreamBlockId(receiverId, time - blockIntervalMs)listener.onGenerateBlock(blockId)newBlock = new Block(blockId, newBlockBuffer)}}    if (newBlock != null) {//将生成成功的Block放入到队列中blocksForPushing.put(newBlock)  // put is blocking when queue is full}} catch {    case ie: InterruptedException =>logInfo("Block updating timer thread was interrupted")    case e: Exception =>reportError("Error in block updating thread", e)}
}

BlockGenerator的start启动就分析完了,至此准备好接收Receiver数据了。 
BlockGenerator的start启动过程如下: 
 
至此ReceiverTrackerImpl的onStart就介绍完了。

18. 回到ReceiverSupervisor的start方法。
/** Start the supervisor */def start() {onStart()startReceiver()
}
19. 启动receiver
** Start receiver */def startReceiver(): Unit = synchronized {  try {    if (onReceiverStart()) {logInfo("Starting receiver")receiverState = Startedreceiver.onStart()logInfo("Called receiver onStart")} else {// The driver refused usstop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)}} catch {case NonFatal(t) =>stop("Error starting receiver " + streamId, Some(t))}
}

onReceiverStart方法在ReceiverSupervisorImpl中实现的。

override protected def onReceiverStart(): Boolean = {val msg = RegisterReceiver(
//此时的endpoint是Receiver的管理者ReceiverSupervisorImpl的消息循环体streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
//Driver端的Endpoint,此时的Boolean必须为true的时候才可以正在startReceiver等后续的工作。
//此时的消息就发送给了ReceiverTrackertrackerEndpoint.askWithRetry[Boolean](msg) //此时就将消息发送给Driver
}

Driver端:ReceiverTrackerEndpoint 接收到ReceiveSupervisor发来的消息。

20. 在receiverAndReply中接收的,源码如下:
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {// Remote messagescase RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) =>val successful =registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress)context.reply(successful)case AddBlock(receivedBlockInfo) =>    if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {walBatchingThreadPool.execute(new Runnable {override def run(): Unit = Utils.tryLogNonFatalError {          if (active) {context.reply(addBlock(receivedBlockInfo))} else {throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")}}})} else {context.reply(addBlock(receivedBlockInfo))}case DeregisterReceiver(streamId, message, error) =>deregisterReceiver(streamId, message, error)context.reply(true)// Local messagescase AllReceiverIds =>context.reply(receiverTrackingInfos.filter(_._2.state != ReceiverState.INACTIVE).keys.toSeq)case StopAllReceivers =>    assert(isTrackerStopping || isTrackerStopped)stopReceivers()context.reply(true)
}
21. registerReceiver源码如下:
/** Register a receiver */private def registerReceiver(streamId: Int,typ: String,host: String,executorId: String,receiverEndpoint: RpcEndpointRef,senderAddress: RpcAddress): Boolean = {//判断streamId是否是元数据信息中的if (!receiverInputStreamIds.contains(streamId)) {    throw new SparkException("Register received for unexpected id " + streamId)}  if (isTrackerStopping || isTrackerStopped) {    return false}  val scheduledLocations = receiverTrackingInfos(streamId).scheduledLocations  val acceptableExecutors = if (scheduledLocations.nonEmpty) {      // This receiver is registering and it's scheduled by// ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledLocations" to check it.scheduledLocations.get} else {      // This receiver is scheduled by "ReceiverSchedulingPolicy.rescheduleReceiver", so calling// "ReceiverSchedulingPolicy.rescheduleReceiver" again to check it.scheduleReceiver(streamId)}  def isAcceptable: Boolean = acceptableExecutors.exists {    case loc: ExecutorCacheTaskLocation => loc.executorId == executorId    case loc: TaskLocation => loc.host == host}  if (!isAcceptable) {    // Refuse it since it's scheduled to a wrong executorfalse} else {    val name = s"${typ}-${streamId}"val receiverTrackingInfo = ReceiverTrackingInfo(streamId,ReceiverState.ACTIVE,scheduledLocations = None,runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)),name = Some(name),endpoint = Some(receiverEndpoint))receiverTrackingInfos.put(streamId, receiverTrackingInfo)listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo))logInfo("Registered receiver for stream " + streamId + " from " + senderAddress)    true}
}
22. 在Receiver的onstart方法必须初始化所有的资源内容,包括线程,buffer等来准备接收数据,并且必须是非阻塞的。
*** This method is called by the system when the receiver is started. This function * must initialize all resources (threads, buffers, etc.) necessary for receiving data.* This function must be non-blocking, so receiving the data must occur on a different * thread. Received data can be stored with Spark by calling `store(data)`.** If there are errors in threads started here, then following options can be done * (i) `reportError(...)` can be called to report the error to the driver.* The receiving of data will continue uninterrupted.* (ii) `stop(...)` can be called to stop receiving data. This will call `onStop()` to* clear up all resources allocated (threads, buffers, etc.) during `onStart()`.* (iii) `restart(...)` can be called to restart the receiver. This will call `onStop()`* immediately, and then `onStart()` after a delay.*/def onStart()
23. 例如SocketReceiver这里具体socketReceiver.启动线程接收数据。
def onStart() {  // Start the thread that receives data over a connectionnew Thread("Socket Receiver") {setDaemon(true)    override def run() { receive() }}.start()
}
24. 在接收数据的时候不断的存储。
/** Create a socket connection and receive data until receiver is stopped */def receive() {    var socket: Socket = nulltry {logInfo("Connecting to " + host + ":" + port)socket = new Socket(host, port)logInfo("Connected to " + host + ":" + port)      val iterator = bytesToObjects(socket.getInputStream())      while(!isStopped && iterator.hasNext) {store(iterator.next)}      if (!isStopped()) {restart("Socket data stream had no more data")} else {logInfo("Stopped receiving")}} catch {      case e: java.net.ConnectException =>restart("Error connecting to " + host + ":" + port, e)      case NonFatal(e) =>logWarning("Error receiving data", e)restart("Error receiving data", e)} finally {      if (socket != null) {socket.close()logInfo("Closed socket to " + host + ":" + port)}}}
}
25. 使用ReceiverSupervisorImpl去存储数据。
/*** Store a single item of received data to Spark's memory.* These single items will be aggregated together into data blocks before* being pushed into Spark's memory.*/def store(dataItem: T) {supervisor.pushSingle(dataItem)
}
26. 最终调用的是BlockGenerator的addData方法去存储数据。
/*** Push a single data item into the buffer.*/def addData(data: Any): Unit = {  if (state == Active) {waitToPush()    synchronized {      if (state == Active) {// currentBuffer不断的存储数据。currentBuffer += data} else {        throw new SparkException(          "Cannot add data as BlockGenerator has not been started or has been stopped")}}} else {    throw new SparkException(      "Cannot add data as BlockGenerator has not been started or has been stopped")}
}
27. currentBuffer是一个ArrayBuffer.
@volatile private var currentBuffer = new ArrayBuffer[Any]

自此,就知道了Spark Streaming使用Receiver接收数据,但是这些数据何时转换成Block? 
转换成Block是由BlockGenerator完成的。 
1. 在BlockGenerator的start方法中使用定时器把数据不断的生成Block

/** Start block generating and pushing threads. */def start(): Unit = synchronized {  if (state == Initialized) {state = ActiveblockIntervalTimer.start()blockPushingThread.start()logInfo("Started BlockGenerator")} else {    throw new SparkException(s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]")}
}
2.  blockIntervalTimer赋值源码如下:
private val blockIntervalTimer =  new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
3.  更新Buffer
/** Change the buffer to which single records are added to. */private def updateCurrentBuffer(time: Long): Unit = {  try {    var newBlock: Block = nullsynchronized {      if (currentBuffer.nonEmpty) {        val newBlockBuffer = currentBuffercurrentBuffer = new ArrayBuffer[Any]        val blockId = StreamBlockId(receiverId, time - blockIntervalMs)listener.onGenerateBlock(blockId)//产生BlocknewBlock = new Block(blockId, newBlockBuffer)}}    if (newBlock != null) {//如果生成Block成功的话,就将Block放入到队列中。blocksForPushing.put(newBlock)  // put is blocking when queue is full}} catch {    case ie: InterruptedException =>logInfo("Block updating timer thread was interrupted")    case e: Exception =>reportError("Error in block updating thread", e)}
}
4.  blocksForPushing源码如下:
private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
5.  Spark默认规定每200ms产生一个Block。
private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")

队列中的数据如何写入到磁盘中?

6.  在BlockGenerator的start方法中,通过blockPushThread将Block写入到磁盘中。
/** Start block generating and pushing threads. */def start(): Unit = synchronized {  if (state == Initialized) {state = ActiveblockIntervalTimer.start()blockPushingThread.start()logInfo("Started BlockGenerator")} else {    throw new SparkException(s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]")}
}
7.  blockPushThread启动的时候不断的调用keepPushingBlocks
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
8.  不断的从队列中取出Block数据,然后通过BlockManager存储。
/** Keep pushing blocks to the BlockManager. */private def keepPushingBlocks() {logInfo("Started block pushing thread")  def areBlocksBeingGenerated: Boolean = synchronized {state != StoppedGeneratingBlocks}  try {    // While blocks are being generated, keep polling for to-be-pushed blocks and push them.while (areBlocksBeingGenerated) {//每个10ms从队列查看下队列是否有数据,是一个定时器Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match {        case Some(block) => pushBlock(block)        case None =>}}    // At this point, state is StoppedGeneratingBlock. So drain the queue of to-be-pushed blocks.logInfo("Pushing out the last " + blocksForPushing.size() + " blocks")    while (!blocksForPushing.isEmpty) {      val block = blocksForPushing.take()logDebug(s"Pushing block $block")pushBlock(block)logInfo("Blocks left to push " + blocksForPushing.size())}logInfo("Stopped block pushing thread")} catch {    case ie: InterruptedException =>logInfo("Block pushing thread was interrupted")    case e: Exception =>reportError("Error in block pushing thread", e)}
}
9.  pushBlock源码如下:
  private def pushBlock(block: Block) {//此时listener是BlockGeneratorListenerlistener.onPushBlock(block.id, block.buffer)    logInfo("Pushed block " + block.id)}
}
10. 此时的listener是在BlockGenerator构造的时候传入的。
private[streaming] class BlockGenerator(listener: BlockGeneratorListener,receiverId: Int,conf: SparkConf,clock: Clock = new SystemClock()) extends RateLimiter(conf) with Logging {12345671234567
11. 在ReceiverSupervisorImpl中我们前面调用的就是onPushBlock.
/** Divides received data records into data blocks for pushing in BlockManager. */private val defaultBlockGeneratorListener = new BlockGeneratorListener {  def onAddData(data: Any, metadata: Any): Unit = { }  def onGenerateBlock(blockId: StreamBlockId): Unit = { }  def onError(message: String, throwable: Throwable) {reportError(message, throwable)}  def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {pushArrayBuffer(arrayBuffer, None, Some(blockId))}
}
12. pushArrayBuffer源码如下:
/** Store an ArrayBuffer of received data as a data block into Spark's memory. */def pushArrayBuffer(arrayBuffer: ArrayBuffer[_],metadataOption: Option[Any],blockIdOption: Option[StreamBlockId]) {pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption)
}
13. pushAndReportBlock源码如下:
/** Store block and report it to driver */def pushAndReportBlock(receivedBlock: ReceivedBlock,metadataOption: Option[Any],blockIdOption: Option[StreamBlockId]) {  val blockId = blockIdOption.getOrElse(nextBlockId)  val time = System.currentTimeMillis  val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")  val numRecords = blockStoreResult.numRecords  val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)//把存储后的元数据信息告诉Driver.trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))logDebug(s"Reported block $blockId")
}
14. ReceivedBlockHandler中ReceivedBlockHandler负责存储receiver接收的数据Block.
/** Trait that represents a class that handles the storage of blocks received by receiver */private[streaming] trait ReceivedBlockHandler {/** Store a received block with the given block id and return related metadata */def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): ReceivedBlockStoreResult  /** Cleanup old blocks older than the given threshold time */def cleanupOldBlocks(threshTime: Long)
}
15. store存储的时候分为两种
/*** Implementation of a `org`.`apache`.`spark`.`streaming`.`receiver`.`ReceivedBlockHandler` which* stores the received blocks into a block manager with the specified storage level.*/private[streaming] class BlockManagerBasedBlockHandler(blockManager: BlockManager, storageLevel: StorageLevel)extends ReceivedBlockHandler with Logging {  def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {    var numRecords = None: Option[Long]    val putResult: Seq[(BlockId, BlockStatus)] = block match {      case ArrayBufferBlock(arrayBuffer) =>numRecords = Some(arrayBuffer.size.toLong)blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel,tellMaster = true)      case IteratorBlock(iterator) =>        val countIterator = new CountingIterator(iterator)        val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,tellMaster = true)numRecords = countIterator.countputResult      case ByteBufferBlock(byteBuffer) =>blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)      case o =>        throw new SparkException(s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}")}    if (!putResult.map { _._1 }.contains(blockId)) {      throw new SparkException(s"Could not store $blockId to block manager with storage level $storageLevel")}BlockManagerBasedStoreResult(blockId, numRecords)}

ReceiverSupervisor的startReceiver启动全过程流程如下: 

Spark Streaming的流数据不断接收数据总体流程如下: 

备注:

1、DT大数据梦工厂微信公众号DT_Spark 
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains

本文转自http://blog.csdn.net/snail_gesture/article/details/51479015

转载于:https://blog.51cto.com/love205088/1784658

第10课:Spark Streaming源码解读之流数据不断接收全生命周期彻底研究和思考相关推荐

  1. 第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密

    第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密 /* 王家林老师授课http://weibo.com/ilovepai ...

  2. 第15课:Spark Streaming源码解读之No Receivers彻底思考

    通常我们使用kafka direct的方式使用的是没有自定offset的构造函数 val kc = new KafkaCluster(kafkaParams) 完完全全就是kafka的操作了 我们看看 ...

  3. Spark 定制版:015~Spark Streaming源码解读之No Receivers彻底思考

    本讲内容: a. Direct Acess b. Kafka 注:本讲内容基于Spark 1.6.1版本(在2016年5月来说是Spark最新版本)讲解. 上节回顾 上一讲中,我们讲Spark Str ...

  4. Spark Streaming源码解读之No Receivers彻底思考

    本期内容 : Direct Acess Kafka Spark Streaming接收数据现在支持的两种方式: 01. Receiver的方式来接收数据,及输入数据的控制 02. No Receive ...

  5. Spark Streaming源码解读之Driver中ReceiverTracker架构设计以具体实现彻底研究

    本期内容 : ReceiverTracker的架构设计 消息循环系统 ReceiverTracker具体实现 一. ReceiverTracker的架构设计 1. ReceiverTracker可以以 ...

  6. Streaming源码解读之接收流数据的全生命周期

    2019独角兽企业重金招聘Python工程师标准>>> 上一课我们讲解了Receiver启动的流程.Receiver是通过ReceiverSupervisor的start方法启动的: ...

  7. Spark Streaming源码分析 – DStream

    A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous sequence o ...

  8. Spark Streaming 源码详解

    原地址 本系列内容适用范围:* 2015.12.05 update, Spark 1.6 全系列 √ (1.6.0-preview,尚未正式发布) * 2015.11.09 update, Spark ...

  9. Spark Streaming源码阅读(0)楔子

    本篇文章主要是引出作者阅读源码的主要原因: 主要原因就是最近前端和后端BUG都写的太多了,老大无路可走,强心干预我进行项目开发,所以就派了个阅读源码的活交给我,想让我学懂了以后进行分享,基于感恩心理, ...

最新文章

  1. mysql5.7复制集_mysql--replication复制集典型配置
  2. 深入浅出详细介绍Java异常,让你茅塞顿开般的感觉
  3. [HDOJ4006]The kth great number
  4. linux分区设置大小,Linux调整磁盘分区大小
  5. Saltstack SLS文件解读
  6. 金士顿服务器内存条型号解读,教你如何解读金士顿台式机内存标签的含义
  7. 在VB中如何让背景图片铺满整个MDIForm
  8. 【Flutter】基础组件【01】Text
  9. PHP5 Session 使用详解(一)
  10. 算法导论第三版 第1章习题答案
  11. c语言的算法必须要有输入输出,多选题: 1、计算机算法必须具备输入、输出和________等特性...
  12. 拯救报错:Error: connect ETIMEDOUT
  13. 柴静十年成长的个人告白 - 读《看见》
  14. C#WinForm实现雷速网站比赛MQTT逆向采集
  15. 为什么企业需要CRM系统?CRM的作用及其重要性分析
  16. python 拷贝文件创建目录失败_解决python os.mkdir创建目录失败的问题
  17. iOS程序模块化设计
  18. 从PD充电器取9V/12V给产品供电快充,PD取电芯片概述
  19. 解决文件无法共享的技巧
  20. 加速Yahoo收录你博客的窍门

热门文章

  1. Qt for Android 部署流程分析
  2. windows xp 创建 Oracle(11G)数据库实例时写入系统日志失败解决方案
  3. mysql数据库导出导入设置编码
  4. [原] insert into … on duplicate key update / replace into 多行数据
  5. Consider defining a bean of type ‘com.xg.stupro.service.StudentService‘ in your configuration.
  6. vueCli3中使用代理,点击页面的刷新按钮时报错
  7. @RequestBody、@RequestParam、@PathVariable
  8. GitBash添加tree命令
  9. Windows下安装配置jdk
  10. java之线程相关juc