1.概述

转载自己学习,建议直接看原文:Akka源码分析-local-DeathWatch

生命周期监控,也就是死亡监控,是akka编程中常用的机制。比如我们有了某个actor的ActorRef之后,希望在该actor死亡之后收到响应的消息,此时我们就可以使用watch函数达到这一目的。

class WatchActor extends Actor {val child = context.actorOf(Props.empty, "child")context.watch(child) // <-- this is the only call needed for registrationvar lastSender = context.system.deadLettersdef receive = {case "kill" ⇒context.stop(child); lastSender = sender()case Terminated(`child`) ⇒ lastSender ! "finished"}
}

我们从官网的一个例子入手,其实DeathWatch用起来还是非常方便的,就是调用context.watch,在对应的actor由于某种原因stop之后,就会收到Terminated消息,该消息只有一个参数,那就是stop的ActorRef。看起来简单,那具体是怎么实现的呢?

/*** Registers this actor as a Monitor for the provided ActorRef.* This actor will receive a Terminated(subject) message when watched* actor is terminated.** `watch` is idempotent if it is not mixed with `watchWith`.** It will fail with an [[IllegalStateException]] if the same subject was watched before using `watchWith`.* To clear the termination message, unwatch first.** *Warning*: This method is not thread-safe and must not be accessed from threads other* than the ordinary actor message processing thread, such as [[java.util.concurrent.CompletionStage]] and [[scala.concurrent.Future]] callbacks.** @return the provided ActorRef*/
def watch(subject: ActorRef): ActorRef

上面是ActorContex关于watch的官方注释,非常简单,就是watch一个actor,然后就会收到对应的Terminated消息,还说这个方法不是线程安全的。

如果读者看过我之前的源码分析文章的话,一定知道context就是ActorContext的实例,而ActorContextActorCell的一个功能截面,那么watch函数的具体实现应该就是在ActorCell里面了。由于ActorCell实现的接口比较多,就不再具体分析如何找到watch实现在哪个类了,直接告诉答案:dungeon.DeathWatch。

private[akka] trait DeathWatch { this: ActorCell ⇒

首先它是一个自我类型限定的trait,这种方式我之前吐槽过这里就不展开说了,来看看watch如何实现的。

override final def watch(subject: ActorRef): ActorRef = subject match {case a: InternalActorRef ⇒if (a != self) {if (!watchingContains(a))maintainAddressTerminatedSubscription(a) {a.sendSystemMessage(Watch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅updateWatching(a, None)}elsecheckWatchingSame(a, None)}a}

从上面源码可以分析出几个简单的技术点:
 
 1、不能watch自身;
 2、如果已经被监控则调用checkWatchingSame;
 3、没有被监控过,就给被监控的actor发送Watch整个系统消息;
 4、没有监控过则更新监控信息。

/*** This map holds a [[None]] for actors for which we send a [[Terminated]] notification on termination,* ``Some(message)`` for actors for which we send a custom termination message.*/private var watching: Map[ActorRef, Option[Any]] = Map.empty
//   when all actor references have uid, i.e. actorFor is removed
private def watchingContains(subject: ActorRef): Boolean =watching.contains(subject) || (subject.path.uid != ActorCell.undefinedUid &&watching.contains(new UndefinedUidActorRef(subject)))

判断是否已经监控过,这个具体实现比较有意思,watching是一个Map,首先判断Map中是否需包含该ActorRef;如果不包含该ActorRef,就去判断有没有UID,有UID则创建一个UndefinedUidActorRef,再去watching中判断是否包含。难道不奇怪么?既然都不包含了,创建一个UndefinedUidActorRef就有可能包含了?谁说不是呢,哈哈。其实也不是。我们来看看ActorRef是如何定义equals的。

/*** Equals takes path and the unique id of the actor cell into account.*/final override def equals(that: Any): Boolean = that match {case other: ActorRef ⇒ path.uid == other.path.uid && path == other.pathcase _               ⇒ false}

上面源码逻辑比较清晰,如果两个ActorRef相等,则一定是path相等,且对应的uid相等。ActorPath的判等就不再分析了,肯定是各个层次相同喽。

那么有没有可能path相同,而uid不同呢?当然可能了,如果一个actor被stop之后,再用相同的actorOf参数创建呢?此时uid是不同的,而path是相同的。

private[akka] class UndefinedUidActorRef(ref: ActorRef) extends MinimalActorRef {override val path = ref.path.withUid(ActorCell.undefinedUid)override def provider = throw new UnsupportedOperationException("UndefinedUidActorRef does not provide")
}

UndefinedUidActorRef就是与原ActorRef路径相同,而uid是ActorCell.undefinedUid的一个新的ActorRef。

maintainAddressTerminatedSubscription,它会判断是不是本地actor,如果是本地actor则调用后面的block,对于远程actor会有一些特殊操作,这里不再分析。

private def updateWatching(ref: InternalActorRef, newMessage: Option[Any]): Unit =watching = watching.updated(ref, newMessage)

updateWatching比较简单,就是把要watch的actorRef插入到watching这个Map中去。你要问我这个ActorRef在Map中对应的value是啥,我也是拒绝回答的,你可以看看watchWith的用法,这里不再分析。下面我们来分析一下被监控的Actor收到Watching之后是如何做响应的。

case Watch(watchee, watcher) ⇒ addWatcher(watchee, watcher)

它命中了ActorCell.systemInvoke中的以上分支。

protected def addWatcher(watchee: ActorRef, watcher: ActorRef): Unit = {val watcheeSelf = watchee == selfval watcherSelf = watcher == selfif (watcheeSelf && !watcherSelf) {if (!watchedBy.contains(watcher)) maintainAddressTerminatedSubscription(watcher) {watchedBy += watcherif (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), s"now watched by $watcher"))}} else if (!watcheeSelf && watcherSelf) {watch(watchee)} else {publish(Warning(self.path.toString, clazz(actor), "BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, self)))}}

正常情况下,会命中第一个if的第一个分支的代码,其实也比较简答,就是去watchedBy里面查找是否保存过watcher,如果没有就把它加到watchedBy里面。

private var watchedBy: Set[ActorRef] = ActorCell.emptyActorRefSet

watchedBy是一个set,也就是里面的ActorRef不重复。那如果这个actor被stop之后,啥时候通知对应的watchedBy呢?这个问题其实还是满复杂的。

如果想知道什么时候通知了watchedBy,就需要知道stop的逻辑,那么ActorCell的stop是如何实现的呢?

// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅final def stop(): Unit = try dispatcher.systemDispatch(this, Terminate()) catch handleException

stop在Dispatch这个trait里面实现,很简单,它又用当前dispatcher发送了一个Terminate消息给自己。

case Terminate() ⇒ terminate()

收到Terminate消息后,调用了terminate方法。

protected def terminate() {setReceiveTimeout(Duration.Undefined)cancelReceiveTimeout// prevent Deadletter(Terminated) messagesunwatchWatchedActors(actor)// stop all children, which will turn childrenRefs into TerminatingChildrenContainer (if there are children)children foreach stopif (systemImpl.aborting) {// separate iteration because this is a very rare case that should not penalize normal operationchildren foreach {case ref: ActorRefScope if !ref.isLocal ⇒ self.sendSystemMessage(DeathWatchNotification(ref, true, false))case _                                  ⇒}}val wasTerminating = isTerminatingif (setChildrenTerminationReason(ChildrenContainer.Termination)) {if (!wasTerminating) {// do not process normal messages while waiting for all children to terminatesuspendNonRecursive()// do not propagate failures during shutdown to the supervisorsetFailed(self)if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "stopping"))}} else {setTerminated()finishTerminate()}}

terminate方法,逻辑清晰,它会通知子actor进行stop。那么子actor是如何stop的呢?

final def stop(actor: ActorRef): Unit = {if (childrenRefs.getByRef(actor).isDefined) {@tailrec def shallDie(ref: ActorRef): Boolean = {val c = childrenRefsswapChildrenRefs(c, c.shallDie(ref)) || shallDie(ref)}if (actor match {case r: RepointableRef ⇒ r.isStartedcase _                 ⇒ true}) shallDie(actor)}actor.asInstanceOf[InternalActorRef].stop()}

其实比较简单,就是判断当前actor是否存在,若存在且已经启动则调用swapChildrenRefs,最后调用这个子actor的stop()方法,进行递归stop。

override def shallDie(actor: ActorRef): ChildrenContainer = TerminatingChildrenContainer(c, Set(actor), UserRequest)

shallDie其实就是创建一个TerminatingChildrenContainer,然后去替换childrenRefs。

@tailrec final protected def setChildrenTerminationReason(reason: ChildrenContainer.SuspendReason): Boolean = {childrenRefs match {case c: ChildrenContainer.TerminatingChildrenContainer ⇒swapChildrenRefs(c, c.copy(reason = reason)) || setChildrenTerminationReason(reason)case _ ⇒ false}}

最后一个if语句会调用setChildrenTerminationReason,此时childrenRefs已经是TerminatingChildrenContainer类型的了,所以会返回true。

private def finishTerminate() {val a = actor/* The following order is crucial for things to work properly. Only change this if you're very confident and lucky.** Please note that if a parent is also a watcher then ChildTerminated and Terminated must be processed in this* specific order.*/try if (a ne null) a.aroundPostStop()catch handleNonFatalOrInterruptedException { e ⇒ publish(Error(e, self.path.toString, clazz(a), e.getMessage)) }finally try dispatcher.detach(this)finally try parent.sendSystemMessage(DeathWatchNotification(self, existenceConfirmed = true, addressTerminated = false))finally try stopFunctionRefs()finally try tellWatchersWeDied()finally try unwatchWatchedActors(a) // stay here as we expect an emergency stop from handleInvokeFailurefinally {if (system.settings.DebugLifecycle)publish(Debug(self.path.toString, clazz(a), "stopped"))clearActorFields(a, recreate = false)clearActorCellFields(this)actor = null}}

所以最终会调用finishTerminate,在finishTerminate代码中会去调用tellWatchersWeDied

protected def tellWatchersWeDied(): Unit =if (!watchedBy.isEmpty) {try {// Don't need to send to parent parent since it receives a DWN by defaultdef sendTerminated(ifLocal: Boolean)(watcher: ActorRef): Unit =if (watcher.asInstanceOf[ActorRefScope].isLocal == ifLocal && watcher != parent)watcher.asInstanceOf[InternalActorRef].sendSystemMessage(DeathWatchNotification(self, existenceConfirmed = true, addressTerminated = false))/** It is important to notify the remote watchers first, otherwise RemoteDaemon might shut down, causing* the remoting to shut down as well. At this point Terminated messages to remote watchers are no longer* deliverable.** The problematic case is:*  1. Terminated is sent to RemoteDaemon*   1a. RemoteDaemon is fast enough to notify the terminator actor in RemoteActorRefProvider*   1b. The terminator is fast enough to enqueue the shutdown command in the remoting*  2. Only at this point is the Terminated (to be sent remotely) enqueued in the mailbox of remoting** If the remote watchers are notified first, then the mailbox of the Remoting will guarantee the correct order.*/watchedBy foreach sendTerminated(ifLocal = false)watchedBy foreach sendTerminated(ifLocal = true)} finally {maintainAddressTerminatedSubscription() {watchedBy = ActorCell.emptyActorRefSet}}}

tellWatchersWeDied做了什么呢?其实就是给watchedBy对应的actorRef发送DeathWatchNotification消息。请注意DeathWatchNotification的第一个参数是self,就是要stop的actor。

case DeathWatchNotification(a, ec, at) ⇒ watchedActorTerminated(a, ec, at)

而watcher收到DeathWatchNotification如何响应呢?

/*** When this actor is watching the subject of [[akka.actor.Terminated]] message* it will be propagated to user's receive.*/protected def watchedActorTerminated(actor: ActorRef, existenceConfirmed: Boolean, addressTerminated: Boolean): Unit = {watchingGet(actor) match {case None ⇒ // We're apparently no longer watching this actor.case Some(optionalMessage) ⇒maintainAddressTerminatedSubscription(actor) {watching = removeFromMap(actor, watching)}if (!isTerminating) {self.tell(optionalMessage.getOrElse(Terminated(actor)(existenceConfirmed, addressTerminated)), actor)terminatedQueuedFor(actor)}}if (childrenRefs.getByRef(actor).isDefined) handleChildTerminated(actor)}

很明显watchedActorTerminated在当前actor处于正常状态,且已经监控了对应的actor时,会给自己发送一个Terminated(actor),或者Terminated(actor,msg)的消息。这样监控者就收到了被监控actor的Terminated消息了。

其实吧,抛开子actor状态的维护以及其他复杂的操作,简单来说就是,监控者保存自己监控了哪些actor,被监控者保存了自己被哪些actor监控了,在被监控者stop的最后一刻发送Terminated消息给监控者就好了。当然了,这还涉及到remote模式,此时就比较复杂,后面再分析。

【akka】Akka源码分析-local-DeathWatch相关推荐

  1. Akka源码分析-Akka Typed

    对不起,akka typed 我是不准备进行源码分析的,首先这个库的API还没有release,所以会may change,也就意味着其概念和设计包括API都会修改,基本就没有再深入分析源码的意义了. ...

  2. 【akka】akka源码 Akka源码分析-FSM

    1.概述 转载自己学习,建议直接看原文:Akka源码分析-FSM akka还有一个不常使用.但我觉得比较方便的一个模块,那就是FSM(有限状态机).我们知道了akka中Actor模型的具体实现之后,就 ...

  3. 【akka】Akka源码分析-Event Bus

    1.概述 转载自己学习,建议直接看原文:Akka源码分析-FSM akka中的EventBus其实是不常用,也最容易被忽略的一个组件. 但如果你深入Cluster的实现就会发现,这个东西其实还挺有用的 ...

  4. spark 源码分析 Blockmanager

    原文链接 参考, Spark源码分析之-Storage模块 对于storage, 为何Spark需要storage模块?为了cache RDD  Spark的特点就是可以将RDD cache在memo ...

  5. 【转】Spark源码分析之-scheduler模块

    原文地址:http://jerryshao.me/architecture/2013/04/21/Spark%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90%E4%B9%8B- ...

  6. 《深入理解Spark:核心思想与源码分析》——SparkContext的初始化(叔篇)——TaskScheduler的启动...

    <深入理解Spark:核心思想与源码分析>一书前言的内容请看链接<深入理解SPARK:核心思想与源码分析>一书正式出版上市 <深入理解Spark:核心思想与源码分析> ...

  7. 【报错】flink源码分析: has no more allocated slots与思考

    文章目录 一. 任务描述与一句话 1. 任务描述 2. 一句话 二. 日志分析 1. 申请一个task manager 2. 大概3分钟后运行这个tm时,报资源找不到 三. 源码分析与报错机制定位 1 ...

  8. 深入理解Spark:核心思想与源码分析

    大数据技术丛书 深入理解Spark:核心思想与源码分析 耿嘉安 著 图书在版编目(CIP)数据 深入理解Spark:核心思想与源码分析/耿嘉安著. -北京:机械工业出版社,2015.12 (大数据技术 ...

  9. Spark源码分析之七:Task运行(一)

    在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在T ...

最新文章

  1. 【GoLang】深入理解slice len cap什么算法? 参数传递有啥蹊跷?
  2. 2017 3月16日,上午
  3. nginx 禁止IP访问服务器和非法域名绑定你的IP
  4. springcloud 消息队列_全面而且简洁介绍SpringCloud微服务
  5. python删除符合条件的行_这十道经典Python笔试题,全做对算我输
  6. xshell1分钟就会自动断_手术室自动门不能正常控制开关门维修案例
  7. javaWeb回忆思维导图
  8. [JavaWeb基础] 015.Struts2 表单验证框架
  9. 面试干货|数据科学与机器学习,16个面试问题深度探讨!
  10. centos 6.5 zabbix 离线安装历程
  11. rsync + inotify 数据实时同步
  12. 中文停用词文档_实战:朴素贝叶斯对文档进行分类
  13. ghost.py 使用实例
  14. 《华为你学不会》读书笔记
  15. PPT母版中更改了页码后,但是应用了该母版的幻灯片没有发生变化
  16. 关于tb双十一成交量
  17. Ubuntu安装输入法却不能切换输入法
  18. 怎样成为高级的学习者?2:解码
  19. 易宝支付为二清机构放开通道,导致POS代理机构卷款跑路
  20. 全球及中国浴用纺织品行业商业模式分析及投资风险预测2022年版

热门文章

  1. opencv 的pyramid down函数verilog实现
  2. 校园网页设计成品 学校班级网页制作模板 dreamweaver网页作业 简单网页课程成品 大学生静态HTML网页源码
  3. 微积分小课堂:微分(从宏观变化了解微观趋势)
  4. 数值分析多种算法C语言代码-推荐
  5. 关于电脑外设键盘的讲解
  6. 两个电脑文件如何同步
  7. 登录过期--localStorage加sessionStorage实现7天登录过期
  8. 智工运维定位器之ublox
  9. SpringSecurity整合JWT
  10. wegame饥荒一直登录中_LOL登录遇到预期之外的错误(用wegame登录出现错误而用客户端登录可以的解决方法)!亲测有效!...