akka系列文章目录

  • akka学习教程(十四) akka分布式实战
  • akka学习教程(十三) akka分布式
  • akka学习教程(十二) Spring与Akka的集成
  • akka学习教程(十一) akka持久化
  • akka学习教程(十) agent
  • akka学习教程(九) STM软件事务内存
  • akka学习教程(八) Actor中的Future-询问模式
  • akka学习教程(七) 内置状态转换Procedure
  • akka学习教程(六) 路由器Router
  • akka学习教程(五) inbox消息收件箱
  • akka学习教程(四) actor生命周期
  • akka学习教程(三) 不可变对象
  • akka学习教程(二)HelloWord
  • akka学习教程(一)简介

本文内容主要来自于来自于官方文档
- Cluster Usage
- Cluster Specification

akka集群概述

Akka群集提供容错分散的对等群集成员服务,没有单点故障或单点瓶颈。 它使用gossip协议和自动故障检测器。

术语

  • 节点
    群集的逻辑成员。 一台物理机上可能有多个节点。 定义格式是【主机名:port:uid】
  • 集群
    通过成员服务连接在一起的一组节点。
  • leader
    集群中充当领导者的单个节点。 管理集群和成员状态转换。

节点状态

  • akka.cluster.allow-weakly-up-members=off 时:

  • akka.cluster.allow-weakly-up-members=off 时:

  • 状态(Member States)

    • joining: 假如集群的临时状态
    • weakly up: 出现网络分区时的临时状态(仅当akka.cluster.allow-weakly-up-members = on时)
    • up: 正常工作状态
    • leaving / exiting: 正常删除
    • down: marked as down (no longer part of cluster decisions)
    • removed: tombstone state (no longer a member)
  • 动作(User Actions)
    • join: 一个节点加入到集群(can be explicit or automatic on startup if a node to join have been specified in the configuration)
    • leave: 优雅移除节点
    • down: 将一个节点标记为关闭
  • leader动作(Leader Actions)
    • 领导人有以下职责:将成员移入和移出群集

      • 加入 -> 正常运行
      • 退出 -> 删除
  • 故障检测和不可达性
    • fd *

      • the failure detector of one of the monitoring nodes has triggered causing the monitored node to be marked as unreachable
    • unreachable*
      • unreachable is not a real member states but more of a flag in addition to the state signaling that the cluster is unable to talk to this node, after being unreachable the failure detector may detect it as reachable again and thereby remove the flag

初始配置

引入akka-cluster的maven依赖

<dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-cluster_2.11</artifactId><version>2.4.16</version>
</dependency>

修改配置文件reference.conf

akka {loglevel = "INFO"actor {provider = "akka.cluster.ClusterActorRefProvider"}remote {log-remote-lifecycle-events = offnetty.tcp {hostname = "127.0.0.1"port = 2551}}cluster {seed-nodes = ["akka.tcp://akkaClusterTest@127.0.0.1:2551","akka.tcp://akkaClusterTest@127.0.0.1:2552"]#//#snippet# excluded from snippetauto-down-unreachable-after = 10s#//#snippet# auto downing is NOT safe for production deployments.# you may want to use it during development, read more about it in the docs.#auto-down-unreachable-after = 10s# Disable legacy metrics in akka-cluster.metrics.enabled=off}
}# 持久化相关
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
# Absolute path to the default snapshot store plugin configuration entry.
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"

编写测试代码(来自于自官网示例)

package akka.myCluster;import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.ClusterEvent.MemberEvent;
import akka.cluster.ClusterEvent.MemberUp;
import akka.cluster.ClusterEvent.MemberRemoved;
import akka.cluster.ClusterEvent.UnreachableMember;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import com.typesafe.config.ConfigFactory;public class SimpleClusterListener extends UntypedActor {LoggingAdapter log = Logging.getLogger(getContext().system(), this);Cluster cluster = Cluster.get(getContext().system());//subscribe to cluster changes@Overridepublic void preStart() {//#subscribecluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(), MemberEvent.class, UnreachableMember.class);//#subscribe}//re-subscribe when restart@Overridepublic void postStop() {cluster.unsubscribe(getSelf());}@Overridepublic void onReceive(Object message) {if (message instanceof MemberUp) {MemberUp mUp = (MemberUp) message;log.info("Member is Up: {}", mUp.member());} else if (message instanceof UnreachableMember) {UnreachableMember mUnreachable = (UnreachableMember) message;log.info("Member detected as unreachable: {}", mUnreachable.member());} else if (message instanceof MemberRemoved) {MemberRemoved mRemoved = (MemberRemoved) message;log.info("Member is Removed: {}", mRemoved.member());} else if (message instanceof MemberEvent) {// ignore} else {unhandled(message);}}public static void main(String [] args){System.out.println("Start simpleClusterListener");ActorSystem system = ActorSystem.create("akkaClusterTest", ConfigFactory.load("reference.conf"));system.actorOf(Props.create(SimpleClusterListener.class), "simpleClusterListener");System.out.println("Started simpleClusterListener");}
}

将以上项目复制一份,并修改其端口为2552

运行结果

先启动2551服务:

Start simpleClusterListener
[INFO] [01/18/2017 15:39:14.886] [main] [akka.remote.Remoting] Starting remoting
[INFO] [01/18/2017 15:39:15.728] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://akkaClusterTest@127.0.0.1:2551]
[INFO] [01/18/2017 15:39:15.744] [main] [akka.cluster.Cluster(akka://akkaClusterTest)] Cluster Node [akka.tcp://akkaClusterTest@127.0.0.1:2551] - Starting up...
[INFO] [01/18/2017 15:39:15.853] [main] [akka.cluster.Cluster(akka://akkaClusterTest)] Cluster Node [akka.tcp://akkaClusterTest@127.0.0.1:2551] - Registered cluster JMX MBean [akka:type=Cluster]
[INFO] [01/18/2017 15:39:15.853] [main] [akka.cluster.Cluster(akka://akkaClusterTest)] Cluster Node [akka.tcp://akkaClusterTest@127.0.0.1:2551] - Started up successfully
Started simpleClusterListener
[WARN] [01/18/2017 15:39:15.900] [akkaClusterTest-akka.actor.default-dispatcher-18] [akka.tcp://akkaClusterTest@127.0.0.1:2551/system/cluster/core/daemon/downingProvider] Don't use auto-down feature of Akka Cluster in production. See 'Auto-downing (DO NOT USE)' section of Akka Cluster documentation.
[WARN] [01/18/2017 15:39:17.057] [akkaClusterTest-akka.remote.default-remote-dispatcher-8] [akka.tcp://akkaClusterTest@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FakkaClusterTest%40127.0.0.1%3A2552-0] Association with remote system [akka.tcp://akkaClusterTest@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://akkaClusterTest@127.0.0.1:2552]] Caused by: [Connection refused: no further information: /127.0.0.1:2552]
[INFO] [01/18/2017 15:39:17.073] [akkaClusterTest-akka.actor.default-dispatcher-2] [akka://akkaClusterTest/deadLetters] Message [akka.cluster.InternalClusterAction$InitJoin$] from Actor[akka://akkaClusterTest/system/cluster/core/daemon/firstSeedNodeProcess-1#1750766313] to Actor[akka://akkaClusterTest/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [01/18/2017 15:39:17.073] [akkaClusterTest-akka.actor.default-dispatcher-2] [akka://akkaClusterTest/deadLetters] Message [akka.cluster.InternalClusterAction$InitJoin$] from Actor[akka://akkaClusterTest/system/cluster/core/daemon/firstSeedNodeProcess-1#1750766313] to Actor[akka://akkaClusterTest/deadLetters] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [01/18/2017 15:39:17.915] [akkaClusterTest-akka.actor.default-dispatcher-21] [akka://akkaClusterTest/deadLetters] Message [akka.cluster.InternalClusterAction$InitJoin$] from Actor[akka://akkaClusterTest/system/cluster/core/daemon/firstSeedNodeProcess-1#1750766313] to Actor[akka://akkaClusterTest/deadLetters] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [01/18/2017 15:39:18.914] [akkaClusterTest-akka.actor.default-dispatcher-17] [akka://akkaClusterTest/deadLetters] Message [akka.cluster.InternalClusterAction$InitJoin$] from Actor[akka://akkaClusterTest/system/cluster/core/daemon/firstSeedNodeProcess-1#1750766313] to Actor[akka://akkaClusterTest/deadLetters] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [01/18/2017 15:39:19.914] [akkaClusterTest-akka.actor.default-dispatcher-2] [akka://akkaClusterTest/deadLetters] Message [akka.cluster.InternalClusterAction$InitJoin$] from Actor[akka://akkaClusterTest/system/cluster/core/daemon/firstSeedNodeProcess-1#1750766313] to Actor[akka://akkaClusterTest/deadLetters] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [01/18/2017 15:39:21.008] [akkaClusterTest-akka.actor.default-dispatcher-19] [akka.cluster.Cluster(akka://akkaClusterTest)] Cluster Node [akka.tcp://akkaClusterTest@127.0.0.1:2551] - Node [akka.tcp://akkaClusterTest@127.0.0.1:2551] is JOINING, roles []
[INFO] [01/18/2017 15:39:21.024] [akkaClusterTest-akka.actor.default-dispatcher-19] [akka.cluster.Cluster(akka://akkaClusterTest)] Cluster Node [akka.tcp://akkaClusterTest@127.0.0.1:2551] - Leader is moving node [akka.tcp://akkaClusterTest@127.0.0.1:2551] to [Up]
[INFO] [01/18/2017 15:39:21.024] [akkaClusterTest-akka.actor.default-dispatcher-2] [akka://akkaClusterTest/user/simpleClusterListener] Member is Up: Member(address = akka.tcp://akkaClusterTest@127.0.0.1:2551, status = Up)

此时再启动2552端口,2552的输出日志为:

Start simpleClusterListener
[INFO] [01/18/2017 15:40:35.671] [main] [akka.remote.Remoting] Starting remoting
[INFO] [01/18/2017 15:40:36.458] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://akkaClusterTest@127.0.0.1:2552]
[INFO] [01/18/2017 15:40:36.474] [main] [akka.cluster.Cluster(akka://akkaClusterTest)] Cluster Node [akka.tcp://akkaClusterTest@127.0.0.1:2552] - Starting up...
[INFO] [01/18/2017 15:40:36.552] [main] [akka.cluster.Cluster(akka://akkaClusterTest)] Cluster Node [akka.tcp://akkaClusterTest@127.0.0.1:2552] - Registered cluster JMX MBean [akka:type=Cluster]
[INFO] [01/18/2017 15:40:36.552] [main] [akka.cluster.Cluster(akka://akkaClusterTest)] Cluster Node [akka.tcp://akkaClusterTest@127.0.0.1:2552] - Started up successfully
Started simpleClusterListener
[WARN] [01/18/2017 15:40:36.567] [akkaClusterTest-akka.actor.default-dispatcher-15] [akka.tcp://akkaClusterTest@127.0.0.1:2552/system/cluster/core/daemon/downingProvider] Don't use auto-down feature of Akka Cluster in production. See 'Auto-downing (DO NOT USE)' section of Akka Cluster documentation.
[INFO] [01/18/2017 15:40:37.067] [akkaClusterTest-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://akkaClusterTest)] Cluster Node [akka.tcp://akkaClusterTest@127.0.0.1:2552] - Welcome from [akka.tcp://akkaClusterTest@127.0.0.1:2551]
[INFO] [01/18/2017 15:40:37.082] [akkaClusterTest-akka.actor.default-dispatcher-4] [akka://akkaClusterTest/user/simpleClusterListener] Member is Up: Member(address = akka.tcp://akkaClusterTest@127.0.0.1:2551, status = Up)
[INFO] [01/18/2017 15:40:37.925] [akkaClusterTest-akka.actor.default-dispatcher-15] [akka://akkaClusterTest/user/simpleClusterListener] Member is Up: Member(address = akka.tcp://akkaClusterTest@127.0.0.1:2552, status = Up)

同时2551会增加几行日志:

[INFO] [01/18/2017 15:40:36.942] [akkaClusterTest-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://akkaClusterTest)] Cluster Node [akka.tcp://akkaClusterTest@127.0.0.1:2551] - Node [akka.tcp://akkaClusterTest@127.0.0.1:2552] is JOINING, roles []
[INFO] [01/18/2017 15:40:37.893] [akkaClusterTest-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://akkaClusterTest)] Cluster Node [akka.tcp://akkaClusterTest@127.0.0.1:2551] - Leader is moving node [akka.tcp://akkaClusterTest@127.0.0.1:2552] to [Up]
[INFO] [01/18/2017 15:40:37.893] [akkaClusterTest-akka.actor.default-dispatcher-4] [akka://akkaClusterTest/user/simpleClusterListener] Member is Up: Member(address = akka.tcp://akkaClusterTest@127.0.0.1:2552, status = Up)

上面日志中可以看到Akka集群中各个节点的状态迁移信息,第一个种子节点正在加入自身创建的集群时的状态时JOINING,由于第一个种子节点将自己率先选举为Leader,因此它还将自己的状态改变为Up。后面它还将第二个种子节点和第三个节点从JOINING转换到Up状态。

同样再添加一个2553端口的服务,与上面一样,就不多说了。
我们现在吧2553服务停止,看看日志输出:

[akka.cluster.Cluster(akka://akkaClusterTest)] Cluster Node [akka.tcp://akkaClusterTest@127.0.0.1:2551] - Leader is auto-downing unreachable node [akka.tcp://akkaClusterTest@127.0.0.1:2553]. Don't use auto-down feature of Akka Cluster in production. See 'Auto-downing (DO NOT USE)' section of Akka Cluster documentation.
[akka.cluster.Cluster(akka://akkaClusterTest)] Cluster Node [akka.tcp://akkaClusterTest@127.0.0.1:2551] - Marking unreachable node [akka.tcp://akkaClusterTest@127.0.0.1:2553] as [Down]
[akka.cluster.Cluster(akka://akkaClusterTest)] Cluster Node [akka.tcp://akkaClusterTest@127.0.0.1:2551] - Leader is removing unreachable node [akka.tcp://akkaClusterTest@127.0.0.1:2553]
[akka://akkaClusterTest/user/simpleClusterListener] Member is Removed: Member(address = akka.tcp://akkaClusterTest@127.0.0.1:2553, status = Removed)

2553被标记为Removed

注意: 我们在配置文件中配置了auto-down-unreachable-after=10s。所以在2553关闭10s后才会真正将其移除。

参考资料

  • 书籍《java高并发程序设计》
  • AKKA官方文档

akka学习教程(十三) akka分布式相关推荐

  1. akka学习教程(十四) akka分布式实战

    akka系列文章目录 akka学习教程(十四) akka分布式实战 akka学习教程(十三) akka分布式 akka学习教程(十二) Spring与Akka的集成 akka学习教程(十一) akka ...

  2. 【STM32】标准库与HAL库对照学习教程十三--软件IIC控制AT24C02

    [STM32]标准库与HAL库对照学习教程十三--软件IIC控制AT24C02 一.前言 二.准备工作 三.AT24C02(EEPROM)介绍 1.AT24C02简介 2.引脚功能 3.设备地址 四. ...

  3. 【转】JMeter学习(十三)分布式部署

    Jmeter 是Java 应用,对于CPU和内存的消耗比较大,因此,当需要模拟数以千计的并发用户时,使用单台机器模拟所有的并发用户就有些力不从心,甚至会引起JAVA内存溢出错误.为了让jmeter工具 ...

  4. Akka 学习(九)Akka Cluster

    参考文章 Gitter Chat,Akka 在线交流平台 Akka Forums,Akka 论坛 Akka in GitHub,Akka 开源项目仓库 Akka Official Website,Ak ...

  5. (转)Akka学习笔记

    Akka学习笔记系列文章: <Akka学习笔记:ACTORS介绍> <Akka学习笔记:Actor消息传递(1)> <Akka学习笔记:Actor消息传递(2)> ...

  6. Akka 学习(四)Remote Actor

    目录 一 介绍 1.1 Remote Actor 1.2 适用场景 1.3 踩坑点 二 实战 2.1 需求 2.2 Java 版本 2.2.1 效果图 2.2.2 实体类 2.2.3 服务端Actor ...

  7. Akka 指南 之「Akka 简介」

    温馨提示:Akka 中文指南的 GitHub 地址为「akka-guide」,欢迎大家Star.Fork,纠错. Akka 简介 欢迎来到 Akka,它是一组用于设计跨越处理器和网络的可扩展.弹性系统 ...

  8. java akka 实战_《Akka实战:快速构建高可用分布式应用》(杜云飞)【摘要 书评 试读】- 京东图书...

    Akka 是一款优秀的分布式并发框架,虽然它是基于 Scala 语言实现的,但我们却可轻松地将其运行在JVM上,在不改变现有架构的基础上支持更高的并发量.另一方面,Akka 是一款轻量级开源技术,它既 ...

  9. netapp学习(十三)---Snapshot基础知识(上)

    Because each Snapshot contains only pointers and blocks that have changed, the size of the Snapshot ...

最新文章

  1. hadoopStreaming---使用Python编写MapReduce
  2. python类、对象、方法、属性之类与对象笔记
  3. 8家云计算及安全巨头联合成立云安全服务联盟
  4. 【Google Play】应用 “更新被拒“ 后续处理 ( 上传新版本后 , 一定要停用被拒的版本, 才可以通过审核 | 停用被拒的版本 | 送审 )
  5. Day10-Python3基础-协程、异步IO、redis缓存、rabbitMQ队列
  6. 查询DBA_HIST_ACTIVE_SESS_HISTORY缓慢
  7. 如何从Internet Explorer或Edge迁移到Chrome(以及为什么要迁移)
  8. 动态规划训练24 [Phalanx HDU - 2859 ]
  9. 在Sql Server 2005使用公用表表达式CTE简化复杂的查询语句
  10. 【C语言简介】C语言的前世今生
  11. Storm概念学习系列之Stream消息流 和 Stream Grouping 消息流组
  12. C语言文件读写(输入输出重定向)
  13. Ubuntu系统安装 - 单系统
  14. kubernetes快速部署及常用命令
  15. 深入OpenJDK源码全面理解Java类加载器(下 -- Java源码篇)
  16. jQ知识补全(供已经入门jq开发者)
  17. 使用jsoup简单爬取微信公众号一些图片
  18. Linux中rar解压软件
  19. testbench——文件读入输出
  20. word公式编号及交叉引用技巧

热门文章

  1. 《微信视频号:内容定位、制作、运营与直播卖货》速读笔记
  2. B2C电商项目(第十二天、微信扫码支付、支付二维码、支付回调逻辑、推送支付通知)
  3. 【报告分享】激荡向前,乘风破浪的食品新国货-一面数据(附下载)
  4. Java 中各种数据类型的转换
  5. bat批处理修改文件夹下文件名字
  6. 修改oracle数据库的名称
  7. git 分支合并(dev合并到master分支)
  8. php 判断字符存在,PHP判断字符串str中是否存在某个值
  9. 配置文件的读取-TOML
  10. 七大多用户商城系统特性对比