LiveListenBus是spark提供的异步消息总线。

private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()@volatile private[scheduler] var queuedEvents = new mutable.ListBuffer[SparkListenerEvent]()

在LiveListenBus中,通过线程安全的CopyOnWriteArrayList来保存AsyncEventQueue,AsyncEventQueue为异步的消息队列,具体的监听器listener保存在其中。

queuedEvents为保存当消息总校还没有启动完毕,但是消息时间已经发布到消息总线时的情况,这时候的消息会直接保存在这个ListBuffer中,直到启动后被发布到消息总线中。

def start(sc: SparkContext, metricsSystem: MetricsSystem): Unit = synchronized {if (!started.compareAndSet(false, true)) {throw new IllegalStateException("LiveListenerBus already started.")}this.sparkContext = scqueues.asScala.foreach { q =>q.start(sc)queuedEvents.foreach(q.post)}queuedEvents = nullmetricsSystem.registerSource(metrics)
}

看到其start()方法,当其start()方法调用,代表该异步消息总线开始工作,在开始的过程中,一次调用所有queues中的AsyncEventQueue的start()方法,表示各个异步消息队列开始工作,并将上文提到分queuedEvents容器中的所有存放的消息时间发布到各个启动的异步队列中并在发布完毕所有存量事件之后,将原本的queuedEvents置为null,代表该异步消息总线已经中的所有异步消息队列已经全部启动。

def post(event: SparkListenerEvent): Unit = {if (stopped.get()) {return}metrics.numEventsPosted.inc()// If the event buffer is null, it means the bus has been started and we can avoid// synchronization and post events directly to the queues. This should be the most// common case during the life of the bus.if (queuedEvents == null) {postToQueues(event)return}// Otherwise, need to synchronize to check whether the bus is started, to make sure the thread// calling start() picks up the new event.synchronized {if (!started.get()) {queuedEvents += eventreturn}}// If the bus was already started when the check above was made, just post directly to the// queues.postToQueues(event)
}private def postToQueues(event: SparkListenerEvent): Unit = {val it = queues.iterator()while (it.hasNext()) {it.next().post(event)}
}

当消息总线中存在事件需要被发布,那么需要调用post()方法进行调用,如果queuedEvents为null,说明该消息总线中的所有异步消息队列已经全部启动,那么直接通过postToqueues()方法将事件依次通过各个消息队列的psot()方法进行发布,否则存放在queuedEvents中,等待各个队列启动后进行发布。

AsyncEventQueue的核心是下面这个dispatchThread。

private val dispatchThread = new Thread(s"spark-listener-group-$name") {setDaemon(true)override def run(): Unit = Utils.tryOrStopSparkContext(sc) {dispatch()}
}private[scheduler] def start(sc: SparkContext): Unit = {if (started.compareAndSet(false, true)) {this.sc = scdispatchThread.start()} else {throw new IllegalStateException(s"$name already started!")}
}

在AsyncEventQueue的构造过程中,会定义该线程,并设置其为守护线程,其run()方法为直接调用dispatch()方法。AsyncEventQueue的start()方法也是直接启动该线程。

if (stopped.get()) {return
}eventCount.incrementAndGet()
if (eventQueue.offer(event)) {return
}

当消息总校调用了AsyncEventQueue的post()方法时,是直接将其放入到队列当中eventQueue中,等待之前的dispatchThread线程获取并处理。

private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) {var next: SparkListenerEvent = eventQueue.take()while (next != POISON_PILL) {val ctx = processingTime.time()try {super.postToAll(next)} finally {ctx.stop()}eventCount.decrementAndGet()next = eventQueue.take()}eventCount.decrementAndGet()
}

在dispatch线程中,会不断从队列中获取发布的时间,并通过postToAll()发布到其中的监听器listener中。

postToAll()的具体逻辑实现在ListenBus这一特征中,在这里会调用该特征中所有具体的监听器去对该事件进行通知,在这里会变成同步调用。

具体的方法处理,可以参考SparkListenBus的doPostEvent()方法。

protected override def doPostEvent(listener: SparkListenerInterface,event: SparkListenerEvent): Unit = {event match {case stageSubmitted: SparkListenerStageSubmitted =>listener.onStageSubmitted(stageSubmitted)case stageCompleted: SparkListenerStageCompleted =>listener.onStageCompleted(stageCompleted)case jobStart: SparkListenerJobStart =>listener.onJobStart(jobStart)case jobEnd: SparkListenerJobEnd =>listener.onJobEnd(jobEnd)case taskStart: SparkListenerTaskStart =>listener.onTaskStart(taskStart)case taskGettingResult: SparkListenerTaskGettingResult =>listener.onTaskGettingResult(taskGettingResult)case taskEnd: SparkListenerTaskEnd =>listener.onTaskEnd(taskEnd)case environmentUpdate: SparkListenerEnvironmentUpdate =>listener.onEnvironmentUpdate(environmentUpdate)case blockManagerAdded: SparkListenerBlockManagerAdded =>listener.onBlockManagerAdded(blockManagerAdded)case blockManagerRemoved: SparkListenerBlockManagerRemoved =>listener.onBlockManagerRemoved(blockManagerRemoved)case unpersistRDD: SparkListenerUnpersistRDD =>listener.onUnpersistRDD(unpersistRDD)case applicationStart: SparkListenerApplicationStart =>listener.onApplicationStart(applicationStart)case applicationEnd: SparkListenerApplicationEnd =>listener.onApplicationEnd(applicationEnd)case metricsUpdate: SparkListenerExecutorMetricsUpdate =>listener.onExecutorMetricsUpdate(metricsUpdate)case executorAdded: SparkListenerExecutorAdded =>listener.onExecutorAdded(executorAdded)case executorRemoved: SparkListenerExecutorRemoved =>listener.onExecutorRemoved(executorRemoved)case executorBlacklistedForStage: SparkListenerExecutorBlacklistedForStage =>listener.onExecutorBlacklistedForStage(executorBlacklistedForStage)case nodeBlacklistedForStage: SparkListenerNodeBlacklistedForStage =>listener.onNodeBlacklistedForStage(nodeBlacklistedForStage)case executorBlacklisted: SparkListenerExecutorBlacklisted =>listener.onExecutorBlacklisted(executorBlacklisted)case executorUnblacklisted: SparkListenerExecutorUnblacklisted =>listener.onExecutorUnblacklisted(executorUnblacklisted)case nodeBlacklisted: SparkListenerNodeBlacklisted =>listener.onNodeBlacklisted(nodeBlacklisted)case nodeUnblacklisted: SparkListenerNodeUnblacklisted =>listener.onNodeUnblacklisted(nodeUnblacklisted)case blockUpdated: SparkListenerBlockUpdated =>listener.onBlockUpdated(blockUpdated)case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)case _ => listener.onOtherEvent(event)}
}

将会根据发布的时间类型,调用监听器listener对应的方法达成对消息的反应。

spark的异步消息总线LiveListenBus相关推荐

  1. kafka之Producer同步与异步消息发送及事务幂等性案例应用实战

    本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark商业应用实战指导,请持续关注本套博客.版权声明:本套Spark商业应用实战归作者(秦凯新)所有,禁止转载,欢迎学习. 秦凯新的技术社区 ...

  2. Spark详解(四):Spark组件以及消息通信原理

    1. Spark核心基本概念 Application(应用程序):指用户编写的Spark应用程序,包含驱动程序(Driver)和分布在集群中多个节点之上的Executor代码,在执行过程中由一个或多个 ...

  3. 微服务中的异步消息通讯

    前言 在上一篇文章中,我们说到了异步消息通讯,下面这篇文章呢,大部分内容是翻译来自于这篇微软的文章,所以其内容还是具有一定的理论指导意义的. 当我们跨多个微服务进行内部通讯的时候,异步消息和事件驱动至 ...

  4. Android组件化方案及组件消息总线modular-event实战

    背景 组件化作为Android客户端技术的一个重要分支,近年来一直是业界积极探索和实践的方向.美团内部各个Android开发团队也在尝试和实践不同的组件化方案,并且在组件化通信框架上也有很多高质量的产 ...

  5. 阿里P8架构师谈:多线程、架构、异步消息、Redis等性能优化策略

    常见性能优化策略分类 1.代码 之所以把代码放到第一位,是因为这一点最容易引起技术人员的忽视.很多技术人员拿到一个性能优化的需求以后,言必称缓存.异步.JVM等.实际上,第一步就应该是分析相关的代码, ...

  6. Message Bus - 消息总线

    Message Bus - 消息总线 Liferay的*消息总线(Message Bus)*是一种服务级API,组件可以用它来发送和接收消息.它提供了消息生产者(producers)和消费者(cons ...

  7. 聊聊C++跨类通信机制之消息总线及其实现

    如果没有怎么写过项目,更确切地说是没有用面向对象的语言写过项目.就不会明白跨类通信这个需求是多么刚需. 为什么要跨类通信?把所有的逻辑都写在一个类中,都是一家人,那就不需要通信了啊.是,确实是这样,当 ...

  8. springcloud微服务架构开发实战:分布式消息总线

    消息总线的定义 前面在1.4.2节中强调过,在微服务架构中,经常会使用REST 服务或基于消息的通信机制. 在3.6节中也详细介绍了消息通信的实现方式.消息总线就是一种基于消息的通信机制. 消息总线是 ...

  9. Android 消息总线汇总(一)

    消息总线的演进之路: 广播,Handler,–>EventBus–>RxBus–>LiveDataBus Handler 面试:Handler内存泄露的原因是什么? https:// ...

最新文章

  1. 通过源代码研究ASP.NET MVC中的Controller和View(二)
  2. 《Effective Java读书笔记》--序列化
  3. webbrowser载入地图网页出现脚本错误解决
  4. 中石油训练赛 - 奎奎发红包(贪心)
  5. Java获取数据类型
  6. 对java的集合的理解_谈谈你对java集合类的理解
  7. java freemarker_Java: FreeMarker的配置和使用
  8. tcp测试监听工具_linux 下两款网络性能测试工具介绍
  9. 页面导入样式时,使用link和@import有什么区别?
  10. softmax回归的简洁实现-09-p5
  11. Golang教程:结构体
  12. java if ( 常量==变量)_Java常量、变量和运算符
  13. windows强制删除文件命令
  14. 妈妈见我来了的香港旅游局
  15. ASDFZ 3633 -- 排兵布阵
  16. RPC + Dubbo
  17. 计算机中二进制的加法
  18. 灵魂拷问:Java的可变参数究竟是怎么一回事?
  19. Aragon:以太坊上的去中心化自治组织管理应用
  20. 我用深度学习做个视觉AI微型处理器!

热门文章

  1. VUE自学日志03-模板语法
  2. 03-搭建Eureka注册中心和服务端
  3. Junit4集成到Maven工程
  4. Guava事件处理组件Eventbus使用入门
  5. springboot 启动加载数据 commandLineRunner
  6. github get 请求指定页面的代码
  7. windows服务器远程执行命令(PowerShell+WinRM)
  8. Alpha 冲刺报告2
  9. Infinity loop in cursor iteration
  10. Unity3D之如何创建正确的像素比在屏幕上