当我初接触akka-cluster的时候,我有一个梦想,希望能充分利用actor自由分布、独立运行的特性实现某种分布式程序。这种程序的计算任务可以进行人为的分割后再把细分的任务分派给分布在多个服务器上的actor上去运算。这些服务器都处于同一集群环境里,它们都是akka-cluster中的节点(node)。akka-cluster的节点数量只需要通过系统配置方式按照计算能力要求随意增减,在集群上运行的分布式程序可以在不修改软件的情况下自动调整actors在各节点上的分布,重新平衡程序运算负载,不受任何影响继续运行。

在前面akka系列的博客里也介绍了一些akka-cluster的情况,最近在“集群环境内编程模式(PICE)”的专题系列里又讨论了如何在集群环境里通过protobuf-gRPC把多个不同类型的数据库服务集成起来。因为集群中的数据库服务是用akka-stream连接的,我们把程序与数据一起作为stream的流元素用Flow发送给相应的数据库服务进行处理。这时一个想法就产生了:当数据库服务接收了一项服务要求后(假设数据处理多是耗时、耗资源的任务)可以对任务进行分割,然后把这些小任务再分发给所属集群内的多个节点上去运算,再按计算要求收集,汇总结果。那么如果能按用户数量和运算任务的规模来任意添减服务器数量就能满足任何规模的运算需求了。最重要的是这种集群节点规模调整必须是某种配置方式,即通过修改配置文件,但不需要修改软件代码。这些需要恰恰又是akka-cluster的特殊能力。所以决定开个akka-cluster的专题系列来具体讨论集群环境下的分布式软件开发模式。

akka-cluster提供的以下几种方式比较符合我们的要求:

1、distributed pub/sub - 分布式发布订阅模式

2、cluster-singleton - 单例actor模式

3、cluster-load-balancing - 集群负载均衡模式

4、cluster-sharding - 集群分片模式

在这个系列下面的博客里我们会逐个模式讨论它们在具体编程的使用细节。但首先探讨一下如何通过配置文件来定义akka-cluster节点,实现集群规模调整。

集群节点(cluster node)的生命周期会经历以下阶段:

Joining->Up,Leaving->Exiting,Exiting->Removed,Unreachable->Up,Unreachable->Down,Down->Removed

下面我们就用运行在不同集群节点的actor,通过订阅系统的集群成员状态转换消息来观察每个节点的状态转变:

class EventListener extends Actor with ActorLogging {import EventListner._val cluster = Cluster(context.system)override def preStart(): Unit = {cluster.subscribe(subscriber = self,initialStateMode = InitialStateAsEvents,classOf[MemberEvent],classOf[UnreachableMember])super.preStart()}override def postStop(): Unit = {cluster.unsubscribe(self)super.postStop()}override def receive: Receive = {case MemberJoined(member) =>log.info("{} is JOINING...", member.address)case MemberUp(member) =>log.info("{} is UP!", member.address)case MemberWeaklyUp(member) =>log.info("{} is weakly UP!", member.address)case MemberLeft(member) =>log.info("{} is LEAVING...", member.address)case MemberExited(member) =>log.info("{} is EXITING...", member.address)case MemberRemoved(member, prevStatus) =>log.info("{} is REMOVED! from state {}", member.address, prevStatus)case UnreachableMember(member) =>log.info("{} is UNREACHABLE!", member.address)case ReachableMember(member) =>log.info("{} is REACHABLE!", member.address)case UnreachableDataCenter(datacenter) =>log.info("Data Center {} is UNREACHABLE!", datacenter)case ReachableDataCenter(datacenter) =>log.info("Data Center {} is REACHABLE!", datacenter)case Leave =>cluster.leave(cluster.selfAddress)log.info("{} is asked to leave cluster.",cluster.selfAddress)case Down =>cluster.down(cluster.selfAddress)log.info("{} is asked to shutdown cluster.",cluster.selfAddress)}}

Leave和Down是自定义消息类型:

object EventListner {trait Messages {}case object Leave extends Messagescase object Down extends Messagesdef props = Props(new EventListener)
...
}

akka-cluster最基本的配置文件内容如下:

akka {actor {provider = "cluster"}remote {log-remote-lifecycle-events = offnetty.tcp {hostname = "localhost"port = 2551}}cluster {seed-nodes = ["akka.tcp://ClusterSystem@localhost:2551"]}
}

实际上hostname,port,seed-nodes这些参数都可以在程序里配置,如果有需要,我们只要在配置文件里注明这是一个集群模式的程序就行了,其它参数放到程序里去定义:

akka {actor {provider = "cluster"}
}

然后我们可以在程序里配置缺失的集群参数:

object EventListner {trait Messages {}case object Leave extends Messagescase object Down extends Messagesdef props = Props(new EventListener)def create(host: String = "localhost", port: Int = 0, seednode: String = "") = {var config = ConfigFactory.parseString(s"akka.remote.netty.tcp.hostname=${host}").withFallback(ConfigFactory.parseString(s"akka.remote.netty.tcp.port=${port}"))if (seednode.length > 0) {val strConfig = "akka.cluster.seed-nodes=[\"" + seednode + "\"]"val configSeed = ConfigFactory.parseString(strConfig)config = config.withFallback(configSeed)}config = config.withFallback(ConfigFactory.load("akka-cluster-config"))val clusterSystem = ActorSystem(name="ClusterSystem",config=config)clusterSystem.actorOf(Props[EventListener])}}

在create函数里ConfigFactory.parseString可以把一个字符串转换成集群配置参数,多个参数可以用withFallback来补充定义。

以下是EventListener的测试程序:

import EventListner._
object EventDemo extends App {val listner1 = EventListner.create(port = 2551)  //seed node
  scala.io.StdIn.readLine()val listner2 = EventListner.create()    //port=0 random port
  scala.io.StdIn.readLine()val listner3 = EventListner.create()    //port=0 random port
scala.io.StdIn.readLine()listner3 ! Leavescala.io.StdIn.readLine()listner2 ! Downscala.io.StdIn.readLine()listner1 ! Leavescala.io.StdIn.readLine()}

第一个运行的必须是seednode,因为每个节点在启动时都需要连接seednode。下面是每个阶段的输出结果:

[INFO] [10/22/2018 18:50:40.888] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:2551] - Started up successfully
[INFO] [10/22/2018 18:50:40.931] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:2551] - Node [akka.tcp://ClusterSystem@localhost:2551] is JOINING itself (with roles [dc-default]) and forming new cluster
[INFO] [10/22/2018 18:50:40.933] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:2551] - Cluster Node [akka.tcp://ClusterSystem@localhost:2551] dc [default] is the new leader
[INFO] [10/22/2018 18:50:40.943] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:2551] - Leader is moving node [akka.tcp://ClusterSystem@localhost:2551] to [Up]
[INFO] [10/22/2018 18:50:41.037] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.tcp://ClusterSystem@localhost:2551/user/$a] akka.tcp://ClusterSystem@localhost:2551 is UP!

[INFO] [10/22/2018 18:50:47.363] [ClusterSystem-akka.actor.default-dispatcher-23] [akka.tcp://ClusterSystem@localhost:51679/user/$a] akka.tcp://ClusterSystem@localhost:51679 is JOINING...
[INFO] [10/22/2018 18:50:47.930] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:2551] - Leader is moving node [akka.tcp://ClusterSystem@localhost:51679] to [Up]
[INFO] [10/22/2018 18:50:47.931] [ClusterSystem-akka.actor.default-dispatcher-2] [akka.tcp://ClusterSystem@localhost:2551/user/$a] akka.tcp://ClusterSystem@localhost:51679 is UP!
[INFO] [10/22/2018 18:50:48.109] [ClusterSystem-akka.actor.default-dispatcher-24] [akka.tcp://ClusterSystem@localhost:51679/user/$a] akka.tcp://ClusterSystem@localhost:51679 is UP!

[INFO] [10/22/2018 18:50:53.765] [ClusterSystem-akka.actor.default-dispatcher-17] [akka.tcp://ClusterSystem@localhost:51681/user/$a] akka.tcp://ClusterSystem@localhost:51681 is JOINING...
[INFO] [10/22/2018 18:50:53.930] [ClusterSystem-akka.actor.default-dispatcher-22] [akka.tcp://ClusterSystem@localhost:51679/user/$a] akka.tcp://ClusterSystem@localhost:51681 is JOINING...
[INFO] [10/22/2018 18:50:54.929] [ClusterSystem-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:2551] - Leader is moving node [akka.tcp://ClusterSystem@localhost:51681] to [Up]
[INFO] [10/22/2018 18:50:54.929] [ClusterSystem-akka.actor.default-dispatcher-21] [akka.tcp://ClusterSystem@localhost:2551/user/$a] akka.tcp://ClusterSystem@localhost:51681 is UP!

[INFO] [10/22/2018 18:52:00.806] [ClusterSystem-akka.actor.default-dispatcher-32] [akka.tcp://ClusterSystem@localhost:51681/user/$a] akka.tcp://ClusterSystem@localhost:51681 is asked to leave cluster.
[INFO] [10/22/2018 18:52:00.807] [ClusterSystem-akka.actor.default-dispatcher-28] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:51681] - Marked address [akka.tcp://ClusterSystem@localhost:51681] as [Leaving]
[INFO] [10/22/2018 18:52:00.808] [ClusterSystem-akka.actor.default-dispatcher-42] [akka.tcp://ClusterSystem@localhost:51681/user/$a] akka.tcp://ClusterSystem@localhost:51681 is LEAVING...
[INFO] [10/22/2018 18:52:00.809] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClusterSystem@localhost:51679/user/$a] akka.tcp://ClusterSystem@localhost:51679 is asked to shutdown cluster.
[INFO] [10/22/2018 18:52:00.809] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:51679] - Marking node [akka.tcp://ClusterSystem@localhost:51679] as [Down]
[INFO] [10/22/2018 18:52:00.810] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://ClusterSystem@localhost:2551/user/$a] akka.tcp://ClusterSystem@localhost:51681 is LEAVING...
[INFO] [10/22/2018 18:52:00.933] [ClusterSystem-akka.actor.default-dispatcher-22] [akka.tcp://ClusterSystem@localhost:51679/user/$a] akka.tcp://ClusterSystem@localhost:51681 is LEAVING...
[INFO] [10/22/2018 18:52:01.101] [ClusterSystem-akka.actor.default-dispatcher-24] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:51679] - Shutting down myself
[INFO] [10/22/2018 18:52:01.102] [ClusterSystem-akka.actor.default-dispatcher-24] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:51679] - Shutting down...
[INFO] [10/22/2018 18:52:01.104] [ClusterSystem-akka.actor.default-dispatcher-24] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:51679] - Successfully shut down
[INFO] [10/22/2018 18:52:01.110] [ClusterSystem-akka.actor.default-dispatcher-23] [akka.tcp://ClusterSystem@localhost:51679/user/$a] akka.tcp://ClusterSystem@localhost:2551 is REMOVED! from state Up
[INFO] [10/22/2018 18:52:01.110] [ClusterSystem-akka.actor.default-dispatcher-23] [akka.tcp://ClusterSystem@localhost:51679/user/$a] akka.tcp://ClusterSystem@localhost:51679 is REMOVED! from state Down
[INFO] [10/22/2018 18:52:01.111] [ClusterSystem-akka.actor.default-dispatcher-23] [akka.tcp://ClusterSystem@localhost:51679/user/$a] akka.tcp://ClusterSystem@localhost:51681 is REMOVED! from state Leaving

[INFO] [10/22/2018 18:52:02.925] [ClusterSystem-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:2551] - Leader is moving node [akka.tcp://ClusterSystem@localhost:51681] to [Exiting]
[INFO] [10/22/2018 18:52:02.926] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://ClusterSystem@localhost:2551/user/$a] akka.tcp://ClusterSystem@localhost:51681 is EXITING...
[INFO] [10/22/2018 18:52:02.927] [ClusterSystem-akka.actor.default-dispatcher-18] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:51681] - Exiting, starting coordinated shutdown
[INFO] [10/22/2018 18:52:02.927] [ClusterSystem-akka.actor.default-dispatcher-21] [akka.tcp://ClusterSystem@localhost:51681/user/$a] akka.tcp://ClusterSystem@localhost:51681 is EXITING...
[INFO] [10/22/2018 18:52:02.934] [ClusterSystem-akka.actor.default-dispatcher-41] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:51681] - Exiting completed

转载于:https://www.cnblogs.com/tiger-xc/p/9834456.html

Akka-Cluster(0)- 分布式应用开发的一些想法相关推荐

  1. 使用akka构建高并发程序_如何使用Akka Cluster创建简单的应用程序

    使用akka构建高并发程序 If you read my previous story about Scalachain, you probably noticed that it is far fr ...

  2. asp.net 分布式应用开发

    Net Framework推出的许多新技术为上述任务的实现提供了相对简单的解决方案.其中,基于SOAP的Web Service在处理分布式应用时具有比传统的DCOM/CORBA明显的优点,结合基于We ...

  3. ESP8266从点灯到遥控小车(三)——组装小车,0代码开发APP控制小车

    ESP8266从点灯到APP遥控小车(三)--组装小车,0代码开发APP控制小车 文章目录 ESP8266从点灯到APP遥控小车(三)--组装小车,0代码开发APP控制小车 所需组件 一.小车所需组件 ...

  4. Akka 学习(九)Akka Cluster

    参考文章 Gitter Chat,Akka 在线交流平台 Akka Forums,Akka 论坛 Akka in GitHub,Akka 开源项目仓库 Akka Official Website,Ak ...

  5. akka cluster原理

    转载自:Akka入门系列(四):akka cluster原理 在前面remote actor一章提到过,akka remoting是Peer-to-Peer的,所以基于remote功能的cluster ...

  6. Cxf + Spring3.0 入门开发WebService

    转自原文地址:http://sunny.blog.51cto.com/182601/625540/ 由于公司业务需求, 需要使用WebService技术对外提供服务,以前没有做过类似的项目,在网上搜寻 ...

  7. 《OpenGL ES 2.0游戏开发(上卷):基础技术和典型案例》一6.6 本章小结

    本节书摘来异步社区<OpenGL ES 2.0游戏开发(上卷):基础技术和典型案例>一书中的第6章,第6.6节,作者: 吴亚峰 责编: 张涛,更多章节内容可以访问云栖社区"异步社 ...

  8. Eclipse搭建Android5.0应用开发环境 “ndk-build”:launchingfailed问题解决

    Eclipse搭建Android5.0应用开发环境 "ndk-build":launchingfailed问题解决 详细参考http://blog.csdn.net/loongem ...

  9. ActionScript3.0程序开发工具

    做为程序员很重要一点你要尽量使用单一的开发工具,可以进行 AS3.0(ActionScript 3.0)开发工具已经开始不断增加,我列举一下我知道的可以写AS3代码的开发工具:第一个就要说记事本,AS ...

  10. wince 6.0 嵌入式开发指导

    因为最近在弄wince的开发,所以在网上找了点资料. http://blog.csdn.net/aawolf/article/details/2232694 上面的博客介绍了嵌入式开发知识. 它里面相 ...

最新文章

  1. JavaScript的正则表达式实现邮箱校验
  2. Zend Studio 如何配置本地apache服务器使用xdebug调试php脚本
  3. Windows系统下安装分布式事务组件Seata
  4. Spring Cloud中Hystrix 线程隔离导致ThreadLocal数据丢失
  5. jQuery按键监听(模拟QQ聊天:按下回车键发送消息)
  6. matlab的combuilder系列-matlab下做com组件 zzfrom SMTH bbs
  7. Android IPC数据在内核空间中的发送过程分析
  8. 华硕主板固态硬盘不识别_[主板] 开机后无法识别硬盘或SSD的故障排除方式
  9. 零基础应该怎么学剪辑,大概要学多长时间?在磨金石教育学靠谱吗?
  10. Revit API 2018调试闪退
  11. 微信公众号数据2019_年度大榜!2019全国县级媒体公众号百强数据看过来
  12. Android WifiDisplay分析二:Wifi display连接过程
  13. Java等额本息实现
  14. 优秀简历模板分享(前端,Java等通用模板)
  15. 减盐不减味,乌江榨菜掀起轻盐升级革命
  16. Mac下将文件复制到移动硬盘
  17. Abaqus学习-初识Abaqus(悬臂梁)
  18. 常用企业微信开源SCRM对比
  19. java设计捕鱼达人中鱼的动图_鱼gif动态动画图片
  20. 找工作真的难吗?并不是工作难找,而是自满的人越来越多。

热门文章

  1. JavaScript 的 async/await 理解(4)
  2. Is there anyway to discover which ip addresses are connected to the db?
  3. 关于Oracle-SQL语句性能优化
  4. 实战react技术栈+express前后端博客项目(3)-- 后端路由、代理以及静态资源托管等配置说明...
  5. Mac Pro 安装 Sublime Text 3,个性化设置,主题 和 插件 收藏
  6. Redis设计与实现 - chapter7 压缩列表
  7. Oracle教程之管理UNDO(九)--如何解决Oracle ORA-01555错误
  8. Windows说明Linux分区和挂载点
  9. Intergration Service(2005)备忘(之)数据传输处理
  10. 记录Access数据库更新操作大坑一个