Actor源码研究,先附上源码

// ......object Actor {/*** Type alias representing a Receive-expression for Akka Actors.*///#receivetype Receive = PartialFunction[Any, Unit]//#receive/*** emptyBehavior is a Receive-expression that matches no messages at all, ever.*/@SerialVersionUID(1L)object emptyBehavior extends Receive {def isDefinedAt(x: Any) = falsedef apply(x: Any) = throw new UnsupportedOperationException("Empty behavior apply()")}/*** Default placeholder (null) used for "!" to indicate that there is no sender of the message,* that will be translated to the receiving system's deadLetters.*/final val noSender: ActorRef = null
}/*** Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model':* <a href="http://en.wikipedia.org/wiki/Actor_model">http://en.wikipedia.org/wiki/Actor_model</a>** An actor has a well-defined (non-cyclic) life-cycle.*  - ''RUNNING'' (created and started actor) - can receive messages*  - ''SHUTDOWN'' (when 'stop' is invoked) - can't do anything** The Actor's own `akka`.`actor`.`ActorRef` is available as `self`, the current* message’s sender as `sender()` and the `akka`.`actor`.`ActorContext` as* `context`. The only abstract method is `receive` which shall return the* initial behavior of the actor as a partial function (behavior can be changed* using `context.become` and `context.unbecome`).** This is the Scala API (hence the Scala code below), for the Java API see `akka`.`actor`.`UntypedActor`.** {{{* class ExampleActor extends Actor {**   override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {*     case _: ArithmeticException      => Resume*     case _: NullPointerException     => Restart*     case _: IllegalArgumentException => Stop*     case _: Exception                => Escalate*   }**   def receive = {*                                      // directly calculated reply*     case Request(r)               => sender() ! calculate(r)**                                      // just to demonstrate how to stop yourself*     case Shutdown                 => context.stop(self)**                                      // error kernel with child replying directly to 'sender()'*     case Dangerous(r)             => context.actorOf(Props[ReplyToOriginWorker]).tell(PerformWork(r), sender())**                                      // error kernel with reply going through us*     case OtherJob(r)              => context.actorOf(Props[ReplyToMeWorker]) ! JobRequest(r, sender())*     case JobReply(result, orig_s) => orig_s ! result*   }* }* }}}** The last line demonstrates the essence of the error kernel design: spawn* one-off actors which terminate after doing their job, pass on `sender()` to* allow direct reply if that is what makes sense, or round-trip the sender* as shown with the fictitious JobRequest/JobReply message pair.** If you don’t like writing `context` you can always `import context._` to get* direct access to `actorOf`, `stop` etc. This is not default in order to keep* the name-space clean.*/
trait Actor {import Actor._// to make type Receive known in subclasses without importtype Receive = Actor.Receive/*** Stores the context for this actor, including self, and sender.* It is implicit to support operations such as `forward`.** WARNING: Only valid within the Actor itself, so do not close over it and* publish it to other threads!** `akka`.`actor`.`ActorContext` is the Scala API. `getContext` returns a* `akka`.`actor`.`UntypedActorContext`, which is the Java API of the actor* context.*/implicit val context: ActorContext = {val contextStack = ActorCell.contextStack.getif ((contextStack.isEmpty) || (contextStack.head eq null))throw ActorInitializationException(s"You cannot create an instance of [${getClass.getName}] explicitly using the constructor (new). " +"You have to use one of the 'actorOf' factory methods to create a new actor. See the documentation.")val c = contextStack.headActorCell.contextStack.set(null :: contextStack)c}/*** The 'self' field holds the ActorRef for this actor.* <p/>* Can be used to send messages to itself:* <pre>* self ! message* </pre>*/implicit final val self = context.self //MUST BE A VAL, TRUST ME/*** The reference sender Actor of the last received message.* Is defined if the message was sent from another Actor,* else `deadLetters` in `akka`.`actor`.`ActorSystem`.** WARNING: Only valid within the Actor itself, so do not close over it and* publish it to other threads!*/final def sender(): ActorRef = context.sender()/*** This defines the initial actor behavior, it must return a partial function* with the actor logic.*///#receivedef receive: Actor.Receive//#receive/*** INTERNAL API.** Can be overridden to intercept calls to this actor's current behavior.** @param receive current behavior.* @param msg current message.*/protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = receive.applyOrElse(msg, unhandled)/*** Can be overridden to intercept calls to `preStart`. Calls `preStart` by default.*/protected[akka] def aroundPreStart(): Unit = preStart()/*** Can be overridden to intercept calls to `postStop`. Calls `postStop` by default.*/protected[akka] def aroundPostStop(): Unit = postStop()/*** Can be overridden to intercept calls to `preRestart`. Calls `preRestart` by default.*/protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = preRestart(reason, message)/*** Can be overridden to intercept calls to `postRestart`. Calls `postRestart` by default.*/protected[akka] def aroundPostRestart(reason: Throwable): Unit = postRestart(reason)/*** User overridable definition the strategy to use for supervising* child actors.*/def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy/*** User overridable callback.* <p/>* Is called when an Actor is started.* Actors are automatically started asynchronously when created.* Empty default implementation.*/@throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest//#lifecycle-hooksdef preStart(): Unit = ()//#lifecycle-hooks/*** User overridable callback.* <p/>* Is called asynchronously after 'actor.stop()' is invoked.* Empty default implementation.*/@throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest//#lifecycle-hooksdef postStop(): Unit = ()//#lifecycle-hooks/*** User overridable callback: '''By default it disposes of all children and then calls `postStop()`.'''* @param reason the Throwable that caused the restart to happen* @param message optionally the current message the actor processed when failing, if applicable* <p/>* Is called on a crashed Actor right BEFORE it is restarted to allow clean* up of resources before Actor is terminated.*/@throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest//#lifecycle-hooksdef preRestart(reason: Throwable, message: Option[Any]): Unit = {context.children foreach { child context.unwatch(child)context.stop(child)}postStop()}//#lifecycle-hooks/*** User overridable callback: By default it calls `preStart()`.* @param reason the Throwable that caused the restart to happen* <p/>* Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash.*/@throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest//#lifecycle-hooksdef postRestart(reason: Throwable): Unit = {preStart()}//#lifecycle-hooks/*** User overridable callback.* <p/>* Is called when a message isn't handled by the current behavior of the actor* by default it fails with either a `akka`.`actor`.`DeathPactException` (in* case of an unhandled `akka`.`actor`.`Terminated` message) or publishes an `akka`.`actor`.`UnhandledMessage`* to the actor's system's `akka`.`event`.`EventStream`*/def unhandled(message: Any): Unit = {message match {case Terminated(dead)  throw new DeathPactException(dead)case _                 context.system.eventStream.publish(UnhandledMessage(message, sender(), self))}}
}
  1. object Actor中的Receive类型是一个偏函数

object Actor {/*** Type alias representing a Receive-expression for Akka Actors.*///#receivetype Receive = PartialFunction[Any, Unit]// ......

2. trait Actor中引用了Receive类型

trait Actor {import Actor._// to make type Receive known in subclasses without importtype Receive = Actor.Receive

3. 每个Actor都有一个隐含的field  context。熟悉JavaEE的读者应该很快想起了servelet对应的session,

servlet容器会自动将很多信息存入session,Akka可以看作actor容器,每次接收message都会把与发送者相关的信息记如context。

  /*** Stores the context for this actor, including self, and sender.* It is implicit to support operations such as `forward`.** WARNING: Only valid within the Actor itself, so do not close over it and* publish it to other threads!** `akka`.`actor`.`ActorContext` is the Scala API. `getContext` returns a* `akka`.`actor`.`UntypedActorContext`, which is the Java API of the actor* context.*/implicit val context: ActorContext = {val contextStack = ActorCell.contextStack.getif ((contextStack.isEmpty) || (contextStack.head eq null))throw ActorInitializationException(s"You cannot create an instance of [${getClass.getName}] explicitly using the constructor (new). " +"You have to use one of the 'actorOf' factory methods to create a new actor. See the documentation.")val c = contextStack.headActorCell.contextStack.set(null :: contextStack)c}

4.每个Actor都有一个ActorRef类型的隐含参数 self,指向自己

  /*** The 'self' field holds the ActorRef for this actor.* <p/>* Can be used to send messages to itself:* <pre>* self ! message* </pre>*/implicit final val self = context.self //MUST BE A VAL, TRUST ME

5. 每个Actor都有一个sender方法,来获取message sender的引用

  /*** The reference sender Actor of the last received message.* Is defined if the message was sent from another Actor,* else `deadLetters` in `akka`.`actor`.`ActorSystem`.** WARNING: Only valid within the Actor itself, so do not close over it and* publish it to other threads!*/final def sender(): ActorRef = context.sender()

6. 终于到了最最关键的receive方法,我们自己的业务逻辑就写在这个方法里。

  /*** This defines the initial actor behavior, it must return a partial function* with the actor logic.*///#receivedef receive: Actor.Receive//#receive

我们在继承Actor,实现receive方法的时候,通常简单地写成:

def receive {case ...case ...
}

这里定义了一个偏函数,为什么可以不加函数类型呢? 因为这个方法的类型在trait Actor中已经定义为了:Actor.Receive,也即:

PartialFunction[Any, Unit]

熟悉JavaEE的同学是不是又很自然地想起了Servelet的doGet和doPost方法?

源码就看的这里。

转载于:https://blog.51cto.com/dingbo/1593602

Akka 2 Actor 源码相关推荐

  1. 【akka】akka源码 Akka源码分析-FSM

    1.概述 转载自己学习,建议直接看原文:Akka源码分析-FSM akka还有一个不常使用.但我觉得比较方便的一个模块,那就是FSM(有限状态机).我们知道了akka中Actor模型的具体实现之后,就 ...

  2. Akka源码分析-FSM

    akka还有一个不常使用.但我觉得比较方便的一个模块,那就是FSM(有限状态机).我们知道了akka中Actor模型的具体实现之后,就会发现,Akka的actor可以非常方便的实现FSM.其实在akk ...

  3. Akka源码分析-Remote-发消息

    上一篇博客我们介绍了remote模式下Actor的创建,其实与local的创建并没有太大区别,一般情况下还是使用LocalActorRef创建了Actor.那么发消息是否意味着也是相同的呢? 既然ac ...

  4. Akka源码分析-Akka Typed

    对不起,akka typed 我是不准备进行源码分析的,首先这个库的API还没有release,所以会may change,也就意味着其概念和设计包括API都会修改,基本就没有再深入分析源码的意义了. ...

  5. 【akka】Akka源码分析-local-DeathWatch

    1.概述 转载自己学习,建议直接看原文:Akka源码分析-local-DeathWatch 生命周期监控,也就是死亡监控,是akka编程中常用的机制.比如我们有了某个actor的ActorRef之后, ...

  6. 【akka】Akka源码分析-Event Bus

    1.概述 转载自己学习,建议直接看原文:Akka源码分析-FSM akka中的EventBus其实是不常用,也最容易被忽略的一个组件. 但如果你深入Cluster的实现就会发现,这个东西其实还挺有用的 ...

  7. 第43课: Spark 1.6 RPC内幕解密:运行机制、源码详解、Netty与Akka等

    第43课: Spark 1.6 RPC内幕解密:运行机制.源码详解.Netty与Akka等 Spark 1.6推出了以RpcEnv.RPCEndpoint.RPCEndpointRef为核心的新型架构 ...

  8. spark 源码分析 Blockmanager

    原文链接 参考, Spark源码分析之-Storage模块 对于storage, 为何Spark需要storage模块?为了cache RDD  Spark的特点就是可以将RDD cache在memo ...

  9. Apache Spark源码走读之6 -- 存储子系统分析

    Spark计算速度远胜于Hadoop的原因之一就在于中间结果是缓存在内存而不是直接写入到disk,本文尝试分析Spark中存储子系统的构成,并以数据写入和数据读取为例,讲述清楚存储子系统中各部件的交互 ...

  10. Spark源码和调优简介 Spark Core

    作者:calvinrzluo,腾讯 IEG 后台开发工程师 本文基于 Spark 2.4.4 版本的源码,试图分析其 Core 模块的部分实现原理,其中如有错误,请指正.为了简化论述,将部分细节放到了 ...

最新文章

  1. 周志华:“深”为什么重要,以及还有什么深的网络
  2. 清华优秀毕业生放弃留学上热搜!计算机系前10名中9人留校深造
  3. 查看oracle已经锁定的表,Oracle中查询被锁定的表
  4. 动词ing形式的5种用法_课后文档:英语语法思维第7课动词的5个形式的用法
  5. Android textview换行
  6. 盘点Git的那些冷门玩法
  7. 获取设备IMEI ,手机名称,系统SDK版本号,系统版本号
  8. Grunt-Kmc基于KISSY项目打包
  9. text-overflow:ellipsis溢出显示省略号兼容所有浏览器的解决办法
  10. php gtk教程,PHP-GTK 介绍及其应用-PHP教程,PHP应用
  11. Android webview mqtt,APICloud
  12. MySQL Innodb data_free 清理
  13. 拓端tecdat|Python银行机器学习:回归、随机森林、KNN近邻、决策树、高斯朴素贝叶斯、支持向量机svm分析营销活动数据
  14. 两台电脑共享鼠标键盘Synergy
  15. 乾颐堂现任明教教主(2014年课程)TCPIP协议详解卷一 第二节课笔记
  16. linux安装Python 以及Python包
  17. windows x64和x86的区别
  18. opc java_Java OPC 代码
  19. canvas下雪效果(原生js)
  20. 银行卡号每输四位加空格,及银行卡的识别(此银行卡号是那个银行)

热门文章

  1. 光缆成端接头的含义是指
  2. C++笔记(6)友元
  3. 重复insmod同一个模块导致段错误
  4. Linux电源管理-wakeup events framework
  5. Linux设备模型初始化流程
  6. 对zebra的一点思考 --- 002
  7. 块层介绍 第一篇: bio层
  8. 使用ffmpeg步骤
  9. Python爬虫入门学习--中国大学排名
  10. ACM动态规划总结(by utobe67)