通过一段时间的学习了解,加深了一些对Akka的认识,特别是对于Akka在实际编程中的用途方面。我的想法,或者我希望利用Akka来达到的目的是这样的:作为传统方式编程的老兵,我们已经习惯了直线流程方式一口气实现完整的功能。如果使用Akka,我们可以把这个完整的功能分切成多个能产生中间临时结果的小功能然后把这些功能放到不同的Actor上分别独立运算,再通过消息来连接这些功能集合成最终结果。如此我们就轻易得到了一个多线程并发程序。由于Akka是软件工具(Tool),没有软件架构(Framework)对编程方式的特别要求,Actor的构建和使用非常方便,我们甚至不需要多少修改就可以直接把原来的一段代码移到Actor上。如果遇到一些重复的运算,我们还可以用Routing来实现并行运算。当然,把Actor当作简单的行令运算器可能还不够,如果能实现一些具体运算之上的高层次程序逻辑和流程就更加完善。我们可以用这样的高层次Actor去解析程序逻辑、执行流程、把具体的运算分配给其它各种运算Actor或者一组Routees并行运算从而取得整体程序的高效率运行。具备了这些功能后,也许我们就可以完全用Actor模式来替代传统单线程行令编程了。Akka可以通过Actor的动态行为转换来实现同一Actor在不同情况下提供不同的功能支持。我们前面提到Actor的功能是在receive函数内实现的。那么转换功能是否就是切换不同的receive函数呢?答案是确定的,Akka是通过Actor的context.become(rcvFunc)来实现receive函数切换的,我们看看下面这个示范:

import akka.actor._object FillSeasons {case object HowYouFeeldef props = Props(new FillSeasons)
}class FillSeasons extends Actor with ActorLogging {import FillSeasons._override def receive: Receive = springdef Winter: Receive = {case HowYouFeel =>log.info("It's freezing cold!")}def summer: Receive = {case HowYouFeel =>log.info("It's hot hot hot!")}def spring: Receive = {case HowYouFeel =>log.info("It feels so goooood!")}
}object Becoming extends App {val demoSystem = ActorSystem("demoSystem")val feelingsActor = demoSystem.actorOf(FillSeasons.props,"feelings")feelingsActor ! FillSeasons.HowYouFeel}

在FeelingsActor里我们定义了三个receive函数,对共同的HowYouFeel消息采取了不同的反应。默认行为是spring。那么应该如何在三种行为中切换呢?用context.become(???),如下:

import akka.actor._object FillSeasons {case object HowYouFeelcase object ToSummercase object ToSpringcase object ToWinterdef props = Props(new FillSeasons)
}class FillSeasons extends Actor with ActorLogging {import FillSeasons._override def receive: Receive = springdef winter: Receive = {case HowYouFeel =>log.info("It's freezing cold!")case ToSummer => context.become(summer)case ToSpring => context.become(spring)}def summer: Receive = {case HowYouFeel =>log.info("It's hot hot hot!")case ToSpring => context.become(spring)case ToWinter => context.become(winter)}def spring: Receive = {case HowYouFeel =>log.info("It feels so goooood!")case ToSummer => context.become(summer)case ToWinter => context.become(winter)}
}object Becoming extends App {val demoSystem = ActorSystem("demoSystem")val feelingsActor = demoSystem.actorOf(FillSeasons.props,"feelings")feelingsActor ! FillSeasons.HowYouFeelfeelingsActor ! FillSeasons.ToSummerfeelingsActor ! FillSeasons.HowYouFeelfeelingsActor ! FillSeasons.ToWinterfeelingsActor ! FillSeasons.HowYouFeelfeelingsActor ! FillSeasons.ToSpringfeelingsActor ! FillSeasons.HowYouFeelscala.io.StdIn.readLine()demoSystem.terminate()
}

我们增加了三个消息来切换receive。运算结果如下:

[INFO] [06/08/2017 17:51:46.013] [demoSystem-akka.actor.default-dispatcher-3] [akka://demoSystem/user/feelings] It feels so goooood!
[INFO] [06/08/2017 17:51:46.019] [demoSystem-akka.actor.default-dispatcher-4] [akka://demoSystem/user/feelings] It's hot hot hot!
[INFO] [06/08/2017 17:51:46.028] [demoSystem-akka.actor.default-dispatcher-4] [akka://demoSystem/user/feelings] It's freezing cold!
[INFO] [06/08/2017 17:51:46.028] [demoSystem-akka.actor.default-dispatcher-4] [akka://demoSystem/user/feelings] It feels so goooood!Process finished with exit code 0

就这样在几个receive里窜来窜去的好像已经能达到我们设想的目的了。看看Akka源代码中become和unbecome发现这样的做法是不正确的:

  def become(behavior: Actor.Receive, discardOld: Boolean = true): Unit =behaviorStack = behavior :: (if (discardOld && behaviorStack.nonEmpty) behaviorStack.tail else behaviorStack)def become(behavior: Procedure[Any]): Unit = become(behavior, discardOld = true)def become(behavior: Procedure[Any], discardOld: Boolean): Unit =become({ case msg ⇒ behavior.apply(msg) }: Actor.Receive, discardOld)def unbecome(): Unit = {val original = behaviorStackbehaviorStack =if (original.isEmpty || original.tail.isEmpty) actor.receive :: emptyBehaviorStackelse original.tail}

从上面的代码可以发现:调用become(x)实际上是把x压进了一个堆栈里。如果像我们这样不断调用become转来转去的,在堆栈上留下旧的行为函数实例最终会造成StackOverFlowError。所以Akka提供了unbecome,这是个堆栈弹出函数,把上一个become压进的行为函数再弹出来,释放一个堆栈空间。所以我们应该用unbecome来解决堆栈溢出问题。但是,如果在多个receive函数之间转换来实现行为变化的话,就难以正确掌握堆栈的压进,弹出冲抵配对,并且无法避免所谓的意大利面代码造成的混乱逻辑。所以,become/unbecome最好使用在两个功能之间的转换。我们再设计一个例子来示范:

sealed trait DBOperations
case class DBWrite(sql: String) extends DBOperations
case class DBRead(sql: String) extends DBOperationssealed trait DBStates
case object Connected extends DBStates
case object Disconnected extends DBStates

DBoperations代表数据库读写操作。DBState代表数据库当前状态:连线Connected或断线Disconnected。只有数据库在Connected状态下才能进行数据库操作。顺理成章,我们需要两个receive函数:

import akka.actor._
sealed trait DBOperations
case class DBWrite(sql: String) extends DBOperations
case class DBRead(sql: String) extends DBOperationssealed trait DBStates
case object Connected extends DBStates
case object Disconnected extends DBStatesobject DBOActor {def props = Props(new DBOActor)
}class DBOActor extends Actor with ActorLogging {override def receive: Receive = disconnecteddef disconnected: Receive = {case Connected =>log.info("Logon to DB.")context.become(connected)}def connected: Receive = {case Disconnected =>log.info("Logoff from DB.")context.unbecome()case DBWrite(sql) =>log.info(s"Writing to DB: $sql")case DBRead(sql) =>log.info(s"Reading from DB: $sql")}
}object BecomeDB extends App {val dbSystem = ActorSystem("dbSystem")val dbActor = dbSystem.actorOf(DBOActor.props,"dbActor")dbActor ! ConnecteddbActor ! DBWrite("Update table x")dbActor ! DBRead("Select from table x")dbActor ! Disconnectedscala.io.StdIn.readLine()dbSystem.terminate()}

运算结果显示如下:

[INFO] [06/09/2017 11:44:40.093] [dbSystem-akka.actor.default-dispatcher-3] [akka://dbSystem/user/dbActor] Logon to DB.
[INFO] [06/09/2017 11:44:40.106] [dbSystem-akka.actor.default-dispatcher-3] [akka://dbSystem/user/dbActor] Writing to DB: Update table x
[INFO] [06/09/2017 11:44:40.107] [dbSystem-akka.actor.default-dispatcher-3] [akka://dbSystem/user/dbActor] Reading from DB: Select from table x
[INFO] [06/09/2017 11:44:40.107] [dbSystem-akka.actor.default-dispatcher-3] [akka://dbSystem/user/dbActor] Logoff from DB.

以上是按正确顺序向dbActor发出数据库操作指令后产生的结果。但是,我们是在一个多线程消息驱动的环境里。发送给dbActor的消息收到时间无法预料。我们试着调换一下指令到达顺序:

  dbActor ! DBWrite("Update table x")dbActor ! ConnecteddbActor ! DBRead("Select from table x")dbActor ! Disconnected

运算结果:

[INFO] [06/09/2017 11:54:57.264] [dbSystem-akka.actor.default-dispatcher-4] [akka://dbSystem/user/dbActor] Logon to DB.
[INFO] [06/09/2017 11:54:57.273] [dbSystem-akka.actor.default-dispatcher-4] [akka://dbSystem/user/dbActor] Reading from DB: Select from table x
[INFO] [06/09/2017 11:54:57.273] [dbSystem-akka.actor.default-dispatcher-4] [akka://dbSystem/user/dbActor] Logoff from DB.

漏掉了DBWrite操作。可以理解,所有connected状态之前的任何操作都不会真正生效。Akka提供了个Stash trait能把一个receive函数未处理的消息都存起来。然后用unstash()可以把存储的消息都转移到本Actor的邮箱里。我们可以用Stash来解决这个消息遗失问题:

  def disconnected: Receive = {case Connected =>log.info("Logon to DB.")context.become(connected)unstashAll()case _ => stash()}

所有消息遗失都是在Disconnected状态内发生的。在disconnected里我们用stash把所有非Connected消息存起来,然后在转换成Connected状态时把这些消息转到信箱。再看看运算结果:

object BecomeDB extends App {val dbSystem = ActorSystem("dbSystem")val dbActor = dbSystem.actorOf(DBOActor.props,"dbActor")dbActor ! DBWrite("Update table x")dbActor ! ConnecteddbActor ! DBRead("Select from table x")dbActor ! Disconnectedscala.io.StdIn.readLine()dbSystem.terminate()}[INFO] [06/09/2017 12:01:54.518] [dbSystem-akka.actor.default-dispatcher-4] [akka://dbSystem/user/dbActor] Logon to DB.
[INFO] [06/09/2017 12:01:54.528] [dbSystem-akka.actor.default-dispatcher-4] [akka://dbSystem/user/dbActor] Writing to DB: Update table x
[INFO] [06/09/2017 12:01:54.528] [dbSystem-akka.actor.default-dispatcher-4] [akka://dbSystem/user/dbActor] Reading from DB: Select from table x
[INFO] [06/09/2017 12:01:54.528] [dbSystem-akka.actor.default-dispatcher-4] [akka://dbSystem/user/dbActor] Logoff from DB.

显示结果正确。下面就是整个示范的源代码:

import akka.actor._
sealed trait DBOperations
case class DBWrite(sql: String) extends DBOperations
case class DBRead(sql: String) extends DBOperationssealed trait DBStates
case object Connected extends DBStates
case object Disconnected extends DBStatesobject DBOActor {def props = Props(new DBOActor)
}class DBOActor extends Actor with ActorLogging with Stash {override def receive: Receive = disconnecteddef disconnected: Receive = {case Connected =>log.info("Logon to DB.")context.become(connected)unstashAll()case _ => stash()}def connected: Receive = {case Disconnected =>log.info("Logoff from DB.")context.unbecome()case DBWrite(sql) =>log.info(s"Writing to DB: $sql")case DBRead(sql) =>log.info(s"Reading from DB: $sql")}
}object BecomeDB extends App {val dbSystem = ActorSystem("dbSystem")val dbActor = dbSystem.actorOf(DBOActor.props,"dbActor")dbActor ! DBWrite("Update table x")dbActor ! ConnecteddbActor ! DBRead("Select from table x")dbActor ! Disconnectedscala.io.StdIn.readLine()dbSystem.terminate()}

Akka(6): become/unbecome:运算行为切换相关推荐

  1. Akka in 2 weeks

    11:07下午 八月 23, 2011 in category Java by ingramchen 简而言之 Akka,是一个 inter-server computation framework. ...

  2. 《实战Java高并发程序设计》读后感

    写在前面无关的内容 白驹过隙,看下日历已经毕业4年多,加上在大学里的4年,算算在计算机界也躺了八年,按照格拉德韦尔的1万小时定律差不多我也该成为行业的专家了,然后并没有.当看着"什么是Jav ...

  3. js三进制计算机,js 笔记 - 二进制位运算符

    概述 二进制位运算符用于直接对二进制位进行计算,一共有7个.二进制或运算符(or):符号为|,表示若两个二进制位都为0,则结果为0,否则为1. 二进制与运算符(and):符号为&,表示若两个二 ...

  4. 注会考试不可以用计算机,CPA机考,不让带计算器该如何做?

    在CPA考试中,会有的考场监考老师比较严格,不让自己携带计算器,那么,该如何做呢?>>>点击领取2020年 (一)机考计算器 机考计算器和windows自带计算器一模一样,平时在家自 ...

  5. Scala go java_Java、Scala和Go语言多线程并发对比测试结果和结论

    AMD 双核 2.8G ,4G内存 winxp java+concjava+AKKA1.3java+AKKA2.0Scala+AKKA1.3Scala+AKKA2.0Go+goroutine 1-N ...

  6. movidius 神经计算棒 ncsdk windows 平台 支持 配置 教程

    概述:movidius ncs的ncsdk目前官方还不支持windows,但是inter好像发布了openvino,这个里面是包含ncsdk的好像是支持windows的.现在不讲openvino,讲n ...

  7. 时间加减计算器_财政局刚刚回应丨禁止携带计算器,否则成绩无效!

    我是岛叔,你好. 在大多数的会计相关考试当中,初级.中级.高级.CPA.税务师等等会计考试,计算基本上是必不可少的考试考查内容,所以就出现了这样一个问题:考试能否携带计算器,哪些考试是可以携带计算器, ...

  8. 时间加减计算器_考前急救!2019年注册会计师计算器使用技巧,不会你就out了...

    众所周知注册会计师考试采用闭卷.计算机化考试方式,考生们在平日备考大部分时间都用纸笔答题,从手写变成上机操作还需要大家提前进行练习,以防在考场中对机考操作或录入符号出现不熟悉的现象,从而影响答题进度, ...

  9. 浅谈大数据任务调度平台

    谈到大数据,避免不了hadoop, hive, spark 这些基础套件,但是在整个大数据开发的时候,我们面对的基本上都是数据开发平台和任务调度系统.数据开发平台一般直接面对业务同学,很大程度上影响业 ...

最新文章

  1. Dundas使用手册
  2. 重走JAVA之路(一):复盘ButterKnife-编译时注解
  3. Cisco Catalyst交换机密码恢复策略
  4. 【NLP】有三AI-NLP专栏首季总结与展望
  5. 如何修改Xshell默认存储路径
  6. 一日一技:在Ocelot网关中实现IdentityServer4密码模式(password)
  7. springBoot+mybaits+达梦数据库
  8. Akka之actor模型
  9. pytorch Tensor
  10. sklearn 3. 实例:随机森林在乳腺癌数据上的调参
  11. 锤子濒危、金立倒闭,华米 OV 们如何艰难求生?
  12. 成为java高级工程师需要什么
  13. Eureka服务治理
  14. 8187l网卡驱动 linux版,在Ubuntu中安装使用realtek 8187b无线网卡
  15. 两个强制屏幕旋转的方法
  16. ​​【接口篇 / Wan】(7.0) ❀ 01. 配置 ADSL 拨号上网 ❀ FortiGate 防火墙
  17. 【计算机视觉40例】案例10:隐身术
  18. 常见几种操作系统简介
  19. vue实现:带关键字跳转企查查并搜索关键字对应的企业
  20. 生产注意事项(分片集群)

热门文章

  1. matlab win7兼容,MatLab7.0和win7兼容
  2. ffmpeg 转换flv压缩大小_视频压缩工具ffmpeg的使用
  3. JS 对象直接量方法创建对象
  4. mysql 主键约束起名_MySQL名称的主键约束
  5. 【oracle】查询===Oracle数据库 子查询(嵌套查询)简单例子
  6. 抓包导出的har格式解析
  7. RSA生成密钥对的过程
  8. python 模拟浏览器selenium_python爬虫10:使用selenium模拟浏览器登录账号
  9. Mac如何创建自签名证书?Mac创建自签名证书图文教程
  10. PCB相关知识-焊盘Pad