AKKA文档(java版)—容错
正如角色系统这一章中解释的一样,每一个角色都是它孩子的监管者,并且像这样的角色都会定义错误处理监管策略。这个策略在成为角色系统结构的一个完整部分之后是不能被改变的。
错误处理实践
首先,让我们看一个处理数据存储错误的例子,它是实践应用中一个典型的错误根源。当然它基于真实应用,这个应用的数据存储有可能是无效的,但我们在这个例子中会用一个最有效的重连方法来实现。
读下面的源代码。内嵌的注释解释了错误处理的不同块和为什么要添加它们。强烈的建议去运行这个例子,这样才能更简单的去跟踪这个日志输出,来了解运行的时候发生了什么。
容错例子的图解(Diagrams of the Fault Tolerance Sample)
容错例子的全部源代码(Full Source Code of the Fault Tolerance Sample)
创建一个监管策略
下面的章节解释了错误处理机制和更深入的选择。
根据示范的目的,让我们来考虑如下策略:
01
|
private static SupervisorStrategy strategy =
|
02
|
new OneForOneStrategy( 10 , Duration.create( "1 minute" ),
|
03
|
new Function<Throwable, Directive>() {
|
04
|
@Override
|
05
|
public Directive apply(Throwable t) {
|
06
|
if (t instanceof ArithmeticException) {
|
07
|
return resume();
|
08
|
} else if (t instanceof NullPointerException) {
|
09
|
return restart();
|
10
|
} else if (t instanceof IllegalArgumentException) {
|
11
|
return stop();
|
12
|
} else {
|
13
|
return escalate();
|
14
|
}
|
15
|
}
|
16
|
});
|
17
|
18
|
@Override
|
19
|
public SupervisorStrategy supervisorStrategy() {
|
20
|
return strategy;
|
21
|
}
|
我选择了一些大家熟知的异常类型,是为了展示在监管和监视章节中描述的错误处理指令的应用。首先,它是一个一对一的策略,意味着每一个孩子都是分开处理的(多对一的策略工作非常类似,唯一的不同就是任何决策都会应用到监管者的所有孩子,不仅仅是发生错误的那个)。在重启频率上会有一些限制,最大是每分钟重启10次。-1和Duration.Inf()意味着限制没有应用,抛开这个可能性,去指定一个绝对的上限或者去让这个重启的工作没有上限。当超出这个限制,孩子角色就被停止。
注意:如果策略在监管角色(而不是一个单独的类)中描述了,它的决策者可以在线程安全的形势下访问角色的所有内部状态,包括获得当前发生错误的孩子的引用(例如错误消息的getSender)。
默认监管策略
如果定义的策略没有覆盖抛出的异常,Escalate会被使用。当没有为一个角色定义监管策略,如下的异常会按照默认来处理:
1. ActorInitializationException会停止发生错误的子角色
2. ActorKilledException会停止发生错误的子角色
3. Exception会重启发生错误的子角色
4. 别的抛出类型会升级到父角色
如果异常升级到根监管者,会按上述的默认策略处理。
停止监管策略
跟Erlang方式类似的策略是当它们失败的时候只停止子角色,以及当DeathWatch通知丢失的子角色的时候会对监管者采取正确的动作。
记录角色失败的信息
默认SupervisorStrategy会记录失败信息除非它们被向上升级。建议在更高层次的结构中处理上升的错误,并潜在的记录下来。
在初始化的时候你可以通过设置SupervisorStrategy的loggingEnabled为false来去掉默认的日志。可以在Decider里定制日志。注意如getSender一样,当SupervisorStrategy在监管角色中描述,当前失败的子角色引用是有效的。
你可以通重写logFailure方法在你自己的SupervisorStrategy实现中定制化日志。
最高层次角色的监管
最高层次角色意味着它们是通过system.actorOf()创建的,并且它们是User Guardian的孩子。在这种情况下没有特定的规则,守护者仅仅应用配置策略。
测试应用
下面章节展示了实践中不同指令的效果,wherefor测试启动是需要的。首先,我们需要一个匹配的监管者。
01
|
public class Supervisor extends UntypedActor {
|
02
|
03
|
private static SupervisorStrategy strategy =
|
04
|
new OneForOneStrategy( 10 , Duration.create( "1 minute" ),
|
05
|
new Function<Throwable, Directive>() {
|
06
|
@Override
|
07
|
public Directive apply(Throwable t) {
|
08
|
if (t instanceof ArithmeticException) {
|
09
|
return resume();
|
10
|
} else if (t instanceof NullPointerException) {
|
11
|
return restart();
|
12
|
} else if (t instanceof IllegalArgumentException) {
|
13
|
return stop();
|
14
|
} else {
|
15
|
return escalate();
|
16
|
}
|
17
|
}
|
18
|
});
|
19
|
20
|
@Override
|
21
|
public SupervisorStrategy supervisorStrategy() {
|
22
|
return strategy;
|
23
|
}
|
24
|
25
|
public void onReceive(Object o) {
|
26
|
if (o instanceof Props) {
|
27
|
getSender().tell(getContext().actorOf((Props) o), getSelf());
|
28
|
} else {
|
29
|
unhandled(o);
|
30
|
}
|
31
|
}
|
32
|
}
|
这个监管者会被用于创建子角色,我们可以实验一下:
01
|
public class Child extends UntypedActor {
|
02
|
int state = 0 ;
|
03
|
04
|
public void onReceive(Object o) throws Exception {
|
05
|
if (o instanceof Exception) {
|
06
|
throw (Exception) o;
|
07
|
} else if (o instanceof Integer) {
|
08
|
state = (Integer) o;
|
09
|
} else if (o.equals( "get" )) {
|
10
|
getSender().tell(state, getSelf());
|
11
|
} else {
|
12
|
unhandled(o);
|
13
|
}
|
14
|
}
|
15
|
}
|
测试使用Testing Actor Systems里介绍的实用工具会比较简单,TestProbe提供一个Actor Ref用于接收和检查消息回复。
01
|
import akka.actor.ActorRef;
|
02
|
import akka.actor.ActorSystem;
|
03
|
import akka.actor.SupervisorStrategy;
|
04
|
import static akka.actor.SupervisorStrategy.resume;
|
05
|
import static akka.actor.SupervisorStrategy.restart;
|
06
|
import static akka.actor.SupervisorStrategy.stop;
|
07
|
import static akka.actor.SupervisorStrategy.escalate;
|
08
|
import akka.actor.SupervisorStrategy.Directive;
|
09
|
import akka.actor.OneForOneStrategy;
|
10
|
import akka.actor.Props;
|
11
|
import akka.actor.Terminated;
|
12
|
import akka.actor.UntypedActor;
|
13
|
import scala.collection.immutable.Seq;
|
14
|
import scala.concurrent.Await;
|
15
|
import static akka.pattern.Patterns.ask;
|
16
|
import scala.concurrent.duration.Duration;
|
17
|
import akka.testkit.AkkaSpec;
|
18
|
import akka.testkit.TestProbe;
|
19
|
20
|
public class FaultHandlingTest {
|
21
|
static ActorSystem system;
|
22
|
Duration timeout = Duration.create( 5 , SECONDS);
|
23
|
24
|
@BeforeClass
|
25
|
public static void start() {
|
26
|
system = ActorSystem.create( "test" , AkkaSpec.testConf());
|
27
|
}
|
28
|
29
|
@AfterClass
|
30
|
public static void cleanup() {
|
31
|
JavaTestKit.shutdownActorSystem(system);
|
32
|
system = null ;
|
33
|
}
|
34
|
35
|
@Test
|
36
|
public void mustEmploySupervisorStrategy() throws Exception {
|
37
|
// code here
|
38
|
}
|
39
|
40
|
}
|
让我们创建角色
1
|
Props superprops = Props.create(Supervisor. class );
|
2
|
ActorRef supervisor = system.actorOf(superprops, "supervisor" );
|
3
|
ActorRef child = (ActorRef) Await.result(ask(supervisor,
|
4
|
Props.create(Child. class ), 5000 ), timeout);
|
第一个测试会演示Resume指令,所以我们尝试通过把角色的状态设置成非初始化状态并让它失败:
1
|
child.tell( 42 , ActorRef.noSender());
|
2
|
assert Await.result(ask(child, "get" , 5000 ), timeout).equals( 42 );
|
3
|
child.tell( new ArithmeticException(), ActorRef.noSender());
|
4
|
assert Await.result(ask(child, "get" , 5000 ), timeout).equals( 42 );
|
你可以看到值42让错误处理指令存活下来。现在,如果我们把错误改成一个更严重的NullPointerException异常,则将不会是这种情况:
1
|
child.tell( new NullPointerException(), ActorRef.noSender());
|
2
|
assert Await.result(ask(child, "get" , 5000 ), timeout).equals( 0 );
|
发生IllegalArgumentException致命异常的情况下,最终会导致子角色会被监管者中断:
1
|
final TestProbe probe = new TestProbe(system);
|
2
|
probe.watch(child);
|
3
|
child.tell( new IllegalArgumentException(), ActorRef.noSender());
|
4
|
probe.expectMsgClass(Terminated. class );
|
到目前为止,监管者还没有完全受到子角色失败的影响,因为指令集会处理它。万一抛出一个异常,监管者会向上抛出错误。
1
|
child = (ActorRef) Await.result(ask(supervisor,
|
2
|
Props.create(Child. class ), 5000 ), timeout);
|
3
|
probe.watch(child);
|
4
|
assert Await.result(ask(child, "get" , 5000 ), timeout).equals( 0 );
|
5
|
child.tell( new Exception(), ActorRef.noSender());
|
6
|
probe.expectMsgClass(Terminated. class );
|
监管者自己是由ActorSystem提供的最高等级的角色监管的,在所有的异常(注意这两个异常
ActorInitializationException和ActorKilledException)情况下,默认策略是重新启动。一旦默认的指令重启去杀死所有的孩子,我们期望我们的穷孩子不要幸存这个错误。
这不是所希望的(这依赖于用例),我们需要去用一个不同的监管者重写这个行为。
01
|
public class Supervisor2 extends UntypedActor {
|
02
|
03
|
private static SupervisorStrategy strategy = new OneForOneStrategy( 10 ,
|
04
|
Duration.create( "1 minute" ),
|
05
|
new Function<Throwable, Directive>() {
|
06
|
@Override
|
07
|
public Directive apply(Throwable t) {
|
08
|
if (t instanceof ArithmeticException) {
|
09
|
return resume();
|
10
|
} else if (t instanceof NullPointerException) {
|
11
|
return restart();
|
12
|
} else if (t instanceof IllegalArgumentException) {
|
13
|
return stop();
|
14
|
} else {
|
15
|
return escalate();
|
16
|
}
|
17
|
}
|
18
|
});
|
19
|
20
|
@Override
|
21
|
public SupervisorStrategy supervisorStrategy() {
|
22
|
return strategy;
|
23
|
}
|
24
|
25
|
public void onReceive(Object o) {
|
26
|
if (o instanceof Props) {
|
27
|
getSender().tell(getContext().actorOf((Props) o), getSelf());
|
28
|
} else {
|
29
|
unhandled(o);
|
30
|
}
|
31
|
}
|
32
|
33
|
@Override
|
34
|
public void preRestart(Throwable cause, Option<Object> msg) {
|
35
|
// do not kill all children, which is the default here
|
36
|
}
|
37
|
}
|
通过父亲,孩子角色幸存向上升级重启,如最后一段测试代码所演示的:
1
|
superprops = Props.create(Supervisor2. class );
|
2
|
supervisor = system.actorOf(superprops);
|
3
|
child = (ActorRef) Await.result(ask(supervisor,
|
4
|
Props.create(Child. class ), 5000 ), timeout);
|
5
|
child.tell( 23 , ActorRef.noSender());
|
6
|
assert Await.result(ask(child, "get" , 5000 ), timeout).equals( 23 );
|
7
|
child.tell( new Exception(), ActorRef.noSender());
|
8
|
assert Await.result(ask(child, "get" , 5000 ), timeout).equals( 0 );
|
- 转载自 并发编程网 - ifeve.com
AKKA文档(java版)—容错相关推荐
- 20165234 [第二届构建之法论坛] 预培训文档(Java版) 学习总结
[第二届构建之法论坛] 预培训文档(Java版) 学习总结 我通读并学习了此文档,并且动手实践了一遍.以下是我学习过程的记录~ Part1.配置环境 配置JDK 原文中提到了2个容易被混淆的概念 JD ...
- [第二届构建之法论坛] 预培训文档(Java版)
本博客是第二届构建之法论坛暨软件工程培训活动预培训文档中[适用于结对编程部分的Java版本],需要实验者有一部分Java基础. 目录 Part0.背景 Part1.配置环境 配置JDK Linux 平 ...
- AKKA文档(java版)——准备开始
http://ifeve.com/akka-doc-java-getting-started/ AKKA文档(java版)--准备开始 原文:http://doc.akka.io/docs/ak ...
- AKKA文档(java)——术语,概念
原文:http://doc.akka.io/docs/akka/2.3.6/general/terminology.html 译者:吴京润 本章我们试图建立一个通用的术语列表,用来定义有关并发和分布式 ...
- ONLYOFFICE 文档开发者版
ONLYOFFICE 文档开发者版 ONLYOFFICE Docs 是一款功能强大的在线编辑器,适用于文本文档.电子表格.演示文稿和表格.创建复杂的文档.专业的电子表格和令人惊叹的演示文稿.支持的常用 ...
- 腾讯文档网页版登录提示服务器,腾讯文档官网地址,腾讯文档电脑版pc端登录入口...
腾讯文档是腾讯近期推出的一款多人协作在线文档产品,不但打通QQ.微信和PC等多个平台,哪么腾讯文档官网地址是什么?有没有电脑版,想在pc端编辑文档怎么办呢?腾讯文档没有电脑版软件,但腾讯文档网页版可以 ...
- 石墨计算机,石墨文档电脑版
石墨文档电脑版 正式版 1.2.4 「桌面客户端」V1.2.4 新版本 6月19 日已上线 修复了客户端窗口不能拖动和缩放的问题 石墨文档 最新版 1.2.7.4 文档 支持插入日期并可设置通知提醒自 ...
- 智慧文档手机版隐私政策
智慧文档手机版手机客户端软件最终用户使用授权协议如下 本最终用户软件授权协议(以下简称"协议")是由您,作为最终用户,与智慧文档手机版手机客户端软件开放方 成都奇异智慧科技有限公司 ...
- 使用计算机编辑文档的同时,小学信息技术2-2-用计算机编辑文档(北京版).doc
小学信息技术2-2-用计算机编辑文档(北京版).doc 2015年入学 2021年毕业第二册 第二单元 用计算机编辑文档一.教学要求 1学习一种文字处理软件,使学生能够使用计算机编辑文件,进行一些文字 ...
最新文章
- 里加一列为1_一素一菩提 @ 素牛排薯泥amp;百香龙珠气泡饮
- 89C51单片机定时器控制的流水灯
- 关于NPN和PNP传感器的应用区别(转载)
- C/C++fflush(stdout)循环打印输出避免缓存区错误
- 图解TCPIP-以太网(物理层)
- 一篇文章看懂Java并发和线程安全
- java基础-(二)-第一个java程序
- 约瑟夫问题(Josephus problem)详解
- Markdown 语法手册全
- 贪婪的大脑:为何人类会无止境地寻求意义 目录
- Python Str字符串 字符串常用方法 定义 创建 拆分 成分判断 大小写调整 格式化 填充 替换 访问 查找
- JavaSE学习总结第01天_Java概述
- svg html5 编辑工具,HTML5之SVG
- 手机如何压缩jpg图片的大小?手把手教你快速压缩jpg格式图片
- 服务器常用的cpu型号,服务器CPU你又知多少?多款型号各个数
- 高富帅的颜色插值方法:在视觉感知线性变化的色彩空间中进行颜色插值
- 面试官:为啥要axios 的二次封装呢 及其使用是干啥的
- 你与chatGPT有什么区别?
- 201421123042 《Java程序设计》第5周学习总结
- 一文弄懂区块链技术原理
热门文章
- 【C++】使用setprecision控制输出流显示浮点数的有效数字个数
- c语言命令行参数怎么输入_C语言程序设计——这个函数原型你看懂了吗
- redis数据类型 - Hash类型
- MySQL 时间戳转换成秒
- Eclipse 启动项目错误:class not found
- jdbc oracle clob blob long类型数据
- React 源码剖析系列 - 不可思议的 react diff
- [Swust OJ 166]--方程的解数(hash法)
- linux内核软中断引起大量丢包
- 对于未定义为 System.String 的列,唯一有效的值是(引发异常)。