2019独角兽企业重金招聘Python工程师标准>>>

Akka in JAVA(三)

上两个部分讲了Akka的基本知识和常见的用法.接下来讲一讲Akka的远程调用以及集群的使用.因为在现在的项目中,基本上都是分布式的,单个的应用程序都快成为”熊猫”了.因此Akka的远程以及集群调用就是非常有必要的了.

Remote调用

Akka-Remoting是采用了P2P(peer-to-peer)的通信方式设计的,也就是端对端的方式.特别是Akka-Remoting不能与网络地址转换和负载均衡一起的工作.
但是,由于Akka在设计的时候就考虑了远程调用以及分布式的情况.因此,Akka-Remoting在使用上就非常的简单,几乎等于是透明的,和本地调用几乎相同.除了传递的消息需要可序列化以及创建和查找Actor的时候路径稍有不同外,没有其他的区别了.

远程调用的准备

要在项目中使用Akka-Remoting非常的简单,只需要引入Maven中的akka-remote就可以了.

1
2
3
4
5
<dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-remote_2.11</artifactId><version>2.4.1</version></dependency>

配置

由于Akka几乎没有特别的为Remoting提供专门的API,区别仅仅在于配置.因此,接下来就是要修改项目中的akka的配置了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
WCMapReduceApp {akka {actor {provider = "akka.remote.RemoteActorRefProvider"}remote {enabled-transports = ["akka.remote.netty.tcp"]netty.tcp {hostname = "127.0.0.1"port = 2552       # 0表示自动选择一个可用的}}}}

我们来看看这个配置和本地的有什么不同.
这个配置文件在akka的配置项中添加了一个actor配置项,并指定provider也就是Actor提供者为akka.remote.RemoteActorRefProvider,即远程Actor提供者.
然后定义了remote远程传输方式,使用akka.remote.netty.tcpnetty的方式提供服务,服务的IP和端口分别是127.0.0.12552,就这么简单.而由于是端对端的通信,因此客户端的配置和服务器端的是一样的.

以上只是远程调用的最小的配置,完整的可选配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
akka {actor {# 序列化方式serializers {proto = "akka.serialization.ProtobufSerializer"}# 发布远程Akkadeployment {default {# 手动指定Actor的远程地址# if this is set to a valid remote address, the named actor will be deployed# at that node e.g. "akka://sys@host:port"remote = ""# 目标target {# A list of hostnames and ports for instantiating the children of a# non-direct router#   The format should be on "akka://sys@host:port", where:#    - sys is the remote actor system name#    - hostname can be either hostname or IP address the remote actor#      should connect to#    - port should be the port for the remote server on the other node# The number of actor instances to be spawned is still taken from the# nr-of-instances setting as for local routers; the instances will be# distributed round-robin among the given nodes.nodes = []}}}}remote {# 传输方式# Which implementation of akka.remote.RemoteTransport to use# default is a TCP-based remote transport based on Nettytransport = "akka.remote.netty.NettyRemoteTransport"# 受信模式# Enable untrusted mode for full security of server managed actors, allows# untrusted clients to connect.untrusted-mode = off# 集群检测超时时间# Timeout for ACK of cluster operations, like checking actor out etc.remote-daemon-ack-timeout = 30s# 是否记录接收的消息# If this is "on", Akka will log all inbound messages at DEBUG level, if off then they are not loggedlog-received-messages = off# 是否记录发送的消息# If this is "on", Akka will log all outbound messages at DEBUG level, if off then they are not loggedlog-sent-messages = off# Each property is annotated with (I) or (O) or (I&O), where I stands for “inbound” and O for “outbound” connections.# The NettyRemoteTransport always starts the server role to allow inbound connections, and it starts# active client connections whenever sending to a destination which is not yet connected; if configured# it reuses inbound connections for replies, which is called a passive client connection (i.e. from server# to client).netty {# 延迟阻塞超时时间# (O) In case of increased latency / overflow how long should we wait (blocking the sender)# until we deem the send to be cancelled?# 0 means "never backoff", any positive number will indicate time to block at most.backoff-timeout = 0ms# 加密cookie# (I&O) Generate your own with '$AKKA_HOME/scripts/generate_config_with_secure_cookie.sh'# or using 'akka.util.Crypt.generateSecureCookie'secure-cookie = ""# 是否需要cookie# (I) Should the remote server require that its peers share the same secure-cookie# (defined in the 'remote' section)?require-cookie = off# 使用被动连接# (I) Reuse inbound connections for outbound messagesuse-passive-connections = on# 域名,连接地址# (I) The hostname or ip to bind the remoting to,# InetAddress.getLocalHost.getHostAddress is used if emptyhostname = ""# 端口# (I) The default remote server port clients should connect to.# Default is 2552 (AKKA), use 0 if you want a random available portport = 2552# # (O) The address of a local network interface (IP Address) to bind to when creating# outbound connections. Set to "" or "auto" for automatic selection of local address.outbound-local-address = "auto"# 消息帧大小# (I&O) Increase this if you want to be able to send messages with large payloadsmessage-frame-size = 1 MiB# 超时时间# (O) Timeout durationconnection-timeout = 120s#连接备份日志大小# (I) Sets the size of the connection backlogbacklog = 4096# 执行线程池存活时间# (I) Length in akka.time-unit how long core threads will be kept alive if idlingexecution-pool-keepalive = 60s# 执行线程池大小# (I) Size of the core pool of the remote execution unitexecution-pool-size = 4#最大通道内存大小# (I) Maximum channel size, 0 for offmax-channel-memory-size = 0b#总计通道内存大小# (I) Maximum total size of all channels, 0 for offmax-total-memory-size = 0b#重试时间间隔# (O) Time between reconnect attempts for active clientsreconnect-delay = 5s# 读取超时时间# (O) Read inactivity period (lowest resolution is seconds)# after which active client connection is shutdown;# will be re-established in case of new communication requests.# A value of 0 will turn this feature offread-timeout = 0s# 写入超时时间# (O) Write inactivity period (lowest resolution is seconds)# after which a heartbeat is sent across the wire.# A value of 0 will turn this feature offwrite-timeout = 10s# 所有超时时间# (O) Inactivity period of both reads and writes (lowest resolution is seconds)# after which active client connection is shutdown;# will be re-established in case of new communication requests# A value of 0 will turn this feature offall-timeout = 0s# 重连窗口时间# (O) Maximum time window that a client should try to reconnect forreconnection-time-window = 600s}# The dispatcher used for the system actor "network-event-sender"network-event-sender-dispatcher {executor = thread-pool-executortype = PinnedDispatcher}}
}

创建远程Actor

通过上面的配置后,在程序里面创建远程的Actor就非常的简单了,基本上感觉不到是创建的远程Actor.

只需要在创建ActorSystem的时候使用上面所说的配置文件即可,接下来的就和本地的Actor没有任何的区别:

1
2
ActorSystem system = ActorSystem.create("WCMapReduceApp", ConfigFactory.load("application").getConfig("WCMapReduceApp"));

通过这个actorSystem创建出来的Actor的路径会是这样的:akka.tcp://WCMapReduceApp@127.0.0.1:2552/user/remoteActor,即是一个远程的Actor.

当然,除了直接在服务器端创建服务外,还能在客户端远程的要求服务器端创建一个Actor,并保持引用.

要实现这样的功能,同样需要修改配置文件:

1
2
3
4
5
6
7
8
9
akka {actor {deployment {/sampleActor {remote = "akka.tcp://sampleActorSystem@127.0.0.1:2553"}}}
}

这个配置文件告诉akka,在创建一个/sampleActor的时候,即system.actorOf().指定的actor并不会在本地创建,而是会请求远程的actorSystem创建这个actor.

查找远程Actor

当客户端发布了一个远程的Actor后,客户端就需要调用它.而向它发送消息的先决条件就是要找到这个Actor.

查询远程的Actor也非常的简单.每一个远程的Actor都会有一个它自己的Path.其格式是:akka://<actorsystemname>@<hostname>:<port>/<actor path>,比如上面所说的akka.tcp://WCMapReduceApp@127.0.0.1:2552/user/remoteActor.那么获取这个Actor的ActorRef,就是通过actorForactorSelection方法传入这个ActorPath即可.

1
final ActorRef remoteActor = system.actorFor("akka.tcp://WCMapReduceApp@127.0.0.1:2552/user/WCMapReduceActor");

接下来的操作就和本地的Actor一模一样了.

序列化

既然是远程调用,那么就涉及到消息的序列化.Akka内置了集中序列化的方式,也提供了序列化的扩展,你可以使用内置的序列化方式,也可以自己实现一个.

要选择使用何种序列化,需要修改配置文件.在akka一节中配置serializers选项,指定序列化的实现类:

1
2
3
4
5
6
7
8
9
akka {actor {serializers {java = "akka.serialization.JavaSerializer" #本地调用的默认方式proto = "akka.remote.serialization.ProtobufSerializer" #远程调用的默认方式myown = "cn.sunxiang0918.akka.demo5.CustomSerializer"}}
}

配置好这个后,还可以绑定数据类型与序列化方式之间的映射:

1
2
3
4
5
6
7
8
9
10
11
akka {actor {serialization-bindings {"java.lang.String" = java"akka.docs.serialization.Customer" = java"com.google.protobuf.Message" = proto"akka.docs.serialization.MyOwnSerializable" = myown"java.lang.Boolean" = myown}}}

上面这段代码就指定了各种数据类型分别采用不同的序列化方式.

如果觉得Akka内置的序列化方式不满足你的要求,可以自定义一个序列化类(通常并不需要).
这个也比较的简单,需要自定义的序列化类继承自JSerializer类.然后实现其中的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import akka.actor.*;
import akka.serialization.*;
import com.typesafe.config.*;public static class MyOwnSerializer extends JSerializer {// This is whether "fromBinary" requires a "clazz" or not@Override public boolean includeManifest() {return false;}// Pick a unique identifier for your Serializer,// you've got a couple of billions to choose from,// 0 - 16 is reserved by Akka itself@Override public int identifier() {return 1234567;}// "toBinary" serializes the given object to an Array of Bytes@Override public byte[] toBinary(Object obj) {//序列化方法}// "fromBinary" deserializes the given array,// using the type hint (if any, see "includeManifest" above)@Override public Object fromBinaryJava(byte[] bytes,Class<?> clazz) {//反序列化方法}}

远程Actor的路由

由于Akka-remoting是基于点对点的.因此,并不能很好的使用网络提供的负载均衡等功能.其实,要解决这个问题,我们可以使用Akka的路由功能.即在配置远程Actor的时候,增加router的参数.

1
2
3
4
5
6
7
akka.actor.deployment {/parent/remotePool {router = round-robin-poolnr-of-instances = 10target.nodes = ["akka.tcp://app@10.0.0.2:2552", "akka://app@10.0.0.3:2552"]}
}

那么当向这个/parent/remotePool发送消息的时候,会轮询的把消息分发到不同的远程服务上,从而实现了高可用和负载均衡.

远程事件

同Akka的本地Actor的生命周期Hook相同,Akka为远程的Actor申明了很多的事件.我们可以监听这些远程调用中发生的事件,也可以订阅这些事件.只需要在ActorSystem.eventStream中为下面的事件增加注册监听器即可. 需要注意的是如果要订阅任意的远程事件,是订阅RemotingLifecycleEvent,如果只订阅涉及链接的生命周期,需要订阅akka.remote.AssociationEvent.

  • DisassociatedEvent : 链接结束事件,这个事件包含了链接方向以及参与方的地址.
  • AssociatedEvent : 链接成功建立事件,这个事件包含了链接方向以及参与方的地址.
  • AssociationErrorEvent : 链接相关错误事件,这个事件包含了链接方向以及参与方的地址以及错误的原因.
  • RemotingListenEvent : 远程子系统准备好接受链接时的事件,这个事件包含了链接方向以及参与方的地址.
  • RemotingShutdownEvent : 远程子系统被关闭的事件
  • RemotingErrorEvent : 远程相关的所有错误

Demo

这里举一个稍微复杂点的例子——单词计数.这个是Hadoop的入门的例子,我们使用Akka配合远程调用来实现一次. 功能是这样的,服务端提供了map/reduce方式的单词数量的计算功能,而服务端提供了文本内容的.
它大概的运行流程是这样的:

  1. 首先客户端的FileReadActor从文本文件中读取文件.
  2. 然后通过ClientActor发送给远端的服务端.
  3. 服务端通过WCMapReduceActor接受客户端发送的消息,并发消息放入优先级MailBox
  4. WCMapReduceActor把接收到的文本内容分发给MapActor做Map计算
  5. Map计算把结果都发送给ReduceActor,做汇总reduce计算.
  6. 最后AggregateActor把计算的结果显示出来.

我们先来看客户端:

FileReadActor.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class FileReadActor extends UntypedActor {@Overridepublic void onReceive(Object message) throws Exception {if (message instanceof String) {/*如果消息是String类型的*/String fileName = (String) message;try {BufferedReader reader = new BufferedReader(new InputStreamReader(Thread.currentThread().getContextClassLoader().getResource(fileName).openStream()));String line;while ((line = reader.readLine()) != null) {/*遍历,一行一个消息反馈给消息发送方*/getSender().tell(line,null);}System.out.println("All lines send !");/*发送一个结束标识*/getSender().tell(String.valueOf("EOF"),null);} catch (IOException x) {System.err.format("IOException: %s%n", x);}} else {throw new IllegalArgumentException("Unknown message [" + message + "]");}}
}

通过InputStreamReader把文本内容一行一个的发送给Sender.

ClientActor.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class ClientActor extends UntypedActor {private ActorRef remoteServer = null;private long start;/*** @param inRemoteServer*/public ClientActor(ActorRef inRemoteServer) {remoteServer = inRemoteServer;}@Overridepublic void onReceive(Object message) throws Exception {/*如果接收到的任务是String的,那么就直接发送给remoteServer这个Actor*/if (message instanceof String) {String msg = (String) message;if (message.equals("EOF")){//这个的Sender设置为自己是为了接收聚合完成的消息remoteServer.tell(msg, getSelf());}else{remoteServer.tell(msg, null);}}else if (message instanceof Boolean) {System.out.println("聚合完成");//聚合完成后发送显示结果的消息remoteServer.tell("DISPLAY_LIST",null);//执行完毕,关机getContext().stop(self());}}@Overridepublic void preStart() {/*记录开始时间*/start = System.currentTimeMillis();}@Overridepublic void postStop() {/*计算用时*/// tell the world that the calculation is completelong timeSpent = (System.currentTimeMillis() - start);System.out.println(String.format("\n\tClientActor estimate: \t\t\n\tCalculation time: \t%s MS",timeSpent));}
}

这个Actor重载了preStart()postStop()方法用以记录性能.

ClientMain.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ClientMain {public static void main(String[] args) throws Exception {//文件名final String fileName = "Othello.txt";/*根据配置,找到System*/ActorSystem system = ActorSystem.create("ClientApplication", ConfigFactory.load("client").getConfig("WCMapReduceClientApp"));/*实例化远程Actor*/final ActorRef remoteActor = system.actorFor("akka.tcp://WCMapReduceApp@127.0.0.1:2552/user/WCMapReduceActor");/*实例化Actor的管道*/final ActorRef fileReadActor = system.actorOf(Props.create(FileReadActor.class));/*实例化Client的Actor管道*/final ActorRef clientActor = system.actorOf(Props.create(ClientActor.class,remoteActor));/*发送文件名给fileReadActor.设置sender或者说回调的Actor为clientActor*/fileReadActor.tell(fileName,clientActor);}
}

这个类里面就使用了远程Actor的查找方法,通过system.actorFor("akka.tcp://WCMapReduceApp@127.0.0.1:2552/user/WCMapReduceActor");获取到了远程的Actor

client.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
WCMapReduceClientApp {include "common"akka {actor {provider = "akka.remote.RemoteActorRefProvider"}remote {enabled-transports = ["akka.remote.netty.tcp"]netty.tcp {hostname = "127.0.0.1"port = 2553   //0表示自动选择一个可用的}}}
}

接下来我们来看看服务器端:

MyPriorityMailBox.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class MyPriorityMailBox extends UnboundedPriorityMailbox {/*** 创建一个自定义优先级的无边界的邮箱. 用来规定命令的优先级. 这个就保证了DISPLAY_LIST 这个事件是最后再来处理.*/public MyPriorityMailBox(ActorSystem.Settings settings, Config config) {// Creating a new PriorityGenerator,super(new PriorityGenerator() {@Overridepublic int gen(Object message) {if (message.equals("DISPLAY_LIST"))return 2; // 'DisplayList messages should be treated// last if possibleelse if (message.equals(PoisonPill.getInstance()))return 3; // PoisonPill when no other leftelsereturn 0; // By default they go with high priority}});}}

通过这个类,自定了一个无边界的优先级邮箱,这样做的目的是保证DISPLAY_LIST命令最后的响应.否则会出现文本内容还没有发送完成的情况下,就进行了结果的统计显示了.要使用这个自定义的优先级邮箱,需要在配置文件中进行配置:
server.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
WCMapReduceApp {include "common"akka {actor {provider = "akka.remote.RemoteActorRefProvider"}remote {enabled-transports = ["akka.remote.netty.tcp"]netty.tcp {hostname = "127.0.0.1"port = 2552}}}priorityMailBox-dispatcher {mailbox-type = "cn.sunxiang0918.akka.demo2.server.MyPriorityMailBox"}
}

MapActor.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
public class MapActor extends UntypedActor {//停用词String[] STOP_WORDS = {"a", "about", "above", "above", "across", "after","afterwards", "again", "against", "all", "almost", "alone","along", "already", "also", "although", "always", "am", "among","amongst", "amoungst", "amount", "an", "and", "another", "any","anyhow", "anyone", "anything", "anyway", "anywhere", "are","around", "as", "at", "back", "be", "became", "because", "become","becomes", "becoming", "been", "before", "beforehand", "behind","being", "below", "beside", "besides", "between", "beyond", "bill","both", "bottom", "but", "by", "call", "can", "cannot", "cant","co", "con", "could", "couldnt", "cry", "de", "describe", "detail","do", "done", "down", "due", "during", "each", "eg", "eight","either", "eleven", "else", "elsewhere", "empty", "enough", "etc","even", "ever", "every", "everyone", "everything", "everywhere","except", "few", "fifteen", "fify", "fill", "find", "fire","first", "five", "for", "former", "formerly", "forty", "found","four", "from", "front", "full", "further", "get", "give", "go","had", "has", "hasnt", "have", "he", "hence", "her", "here","hereafter", "hereby", "herein", "hereupon", "hers", "herself","him", "himself", "his", "how", "however", "hundred", "ie", "if","in", "inc", "indeed", "interest", "into", "is", "it", "its","itself", "keep", "last", "latter", "latterly", "least", "less","ltd", "made", "many", "may", "me", "meanwhile", "might", "mill","mine", "more", "moreover", "most", "mostly", "move", "much","must", "my", "myself", "name", "namely", "neither", "never","nevertheless", "next", "nine", "no", "nobody", "none", "noone","nor", "not", "nothing", "now", "nowhere", "of", "off", "often","on", "once", "one", "only", "onto", "or", "other", "others","otherwise", "our", "ours", "ourselves", "out", "over", "own","part", "per", "perhaps", "please", "put", "rather", "re", "same","see", "seem", "seemed", "seeming", "seems", "serious", "several","she", "should", "show", "side", "since", "sincere", "six","sixty", "so", "some", "somehow", "someone", "something","sometime", "sometimes", "somewhere", "still", "such", "system","take", "ten", "than", "that", "the", "their", "them","themselves", "then", "thence", "there", "thereafter", "thereby","therefore", "therein", "thereupon", "these", "they", "thickv","thin", "third", "this", "those", "though", "three", "through","throughout", "thru", "thus", "to", "together", "too", "top","toward", "towards", "twelve", "twenty", "two", "un", "under","until", "up", "upon", "us", "very", "via", "was", "we", "well","were", "what", "whatever", "when", "whence", "whenever", "where","whereafter", "whereas", "whereby", "wherein", "whereupon","wherever", "whether", "which", "while", "whither", "who","whoever", "whole", "whom", "whose", "why", "will", "with","within", "without", "would", "yet", "you", "your", "yours","yourself", "yourselves", "the"};List<String> STOP_WORDS_LIST = Arrays.asList(STOP_WORDS);/*reduce聚合的Actor*/private ActorRef actor = null;public MapActor(ActorRef inReduceActor) {actor = inReduceActor;}@Overridepublic void preStart() throws Exception {System.out.println("启动MapActor:"+Thread.currentThread().getName());}/*** 用于分词 计算单词的数量的* @param line* @return*/private List<Result> evaluateExpression(String line) {List<Result> list = new ArrayList<>();/*字符串分词器*/StringTokenizer parser = new StringTokenizer(line);while (parser.hasMoreTokens()) {/*如果是,那么就判断是否是字母.然后把结果记录下来*/String word = parser.nextToken().toLowerCase();if (isAlpha(word) && !STOP_WORDS_LIST.contains(word)) {list.add(new Result(word, 1));}}return list;}/*** 判断是否是字母* @param s* @return*/private boolean isAlpha(String s) {s = s.toUpperCase();for (int i = 0; i < s.length(); i++) {int c = (int) s.charAt(i);if (c < 65 || c > 90)return false;}return true;}@Overridepublic void onReceive(Object message) throws Exception {if (message instanceof String) {String work = (String) message;if (work.equals("EOF")){/*表示已经结束了*/actor.tell(true,null);return;}// 计算这一行的单词情况List<Result> list = evaluateExpression(work);// 把这一行的单词情况发送给汇总的ReduceActoractor.tell(list, null);} elsethrow new IllegalArgumentException("Unknown message [" + message + "]");}
}

当这个actor接收到消息后,判断是否是结束标识,如果是就发送消息给reduceActor表示已经结束了.否则就计算这一行中的单词的个数,并把这个个数发送给reduceActor.

ReduceActor.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class ReduceActor extends UntypedActor {/*管道Actor*/private ActorRef actor = null;public ReduceActor(ActorRef inAggregateActor) {actor = inAggregateActor;}@Overridepublic void preStart() throws Exception {System.out.println("启动ReduceActor:"+Thread.currentThread().getName());}@Overridepublic void onReceive(Object message) throws Exception {if (message instanceof List) {/*强制转换结果*/List<Result> work = (List<Result>) message;// 第一次汇总单词表结果.NavigableMap<String, Integer> reducedList = reduce(work);// 把这次汇总的结果发送给最终的结果聚合Actoractor.tell(reducedList, null);}else if (message instanceof Boolean) {//表示已经计算结束了// 把这次汇总的结果发送给最终的结果聚合Actoractor.tell(message, null);} elsethrow new IllegalArgumentException("Unknown message [" + message + "]");}/*** 聚合计算本次结果中各个单词的出现次数* @param list* @return*/private NavigableMap<String, Integer> reduce(List<Result> list) {NavigableMap<String, Integer> reducedMap = new ConcurrentSkipListMap<>();for (Result result : list) {/*遍历结果,如果在这个小的结果中已经存在相同的单词了,那么数量+1,否则新建*/if (reducedMap.containsKey(result.getWord())) {Integer value = reducedMap.get(result.getWord());value++;reducedMap.put(result.getWord(), value);} else {reducedMap.put(result.getWord(), 1);}}return reducedMap;}
}

这个Actor接收到消息后,判断是什么消息,如果是结果消息,那么就对结果进行整理,得出某个单词出现的次数,否则就是结束标记,告诉管道Actor统计已经结束.

AggregateActor.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class AggregateActor extends UntypedActor {/*最终的结果*/private Map<String, Integer> finalReducedMap = new HashMap<>();@Overridepublic void preStart() throws Exception {System.out.println("启动AggregateActor:"+Thread.currentThread().getName());}@Overridepublic void onReceive(Object message) throws Exception {/*如果是Map,那么就进行reduce操作*/if (message instanceof Map) {Map<String, Integer> reducedList = (Map<String, Integer>) message;aggregateInMemoryReduce(reducedList);} else if (message instanceof String) {/*如果是String,那么就是打印结果*/if (((String) message).compareTo("DISPLAY_LIST") == 0) {//getSender().tell(finalReducedMap.toString());System.out.println(finalReducedMap.toString());}}else if (message instanceof Boolean) {/*向客户端发送已经reduce完成的信息*/getSender().tell(true,null);}}private void aggregateInMemoryReduce(Map<String, Integer> reducedList) {for (String key : reducedList.keySet()) {/*最终的数量的累加*/if (finalReducedMap.containsKey(key)) {Integer count = reducedList.get(key) + finalReducedMap.get(key);finalReducedMap.put(key, count);} else {finalReducedMap.put(key, reducedList.get(key));}}}}

这个actor接收到消息后,判断消息的类型,如果是DISPLAY_LIST标识,那么就打印结果.如果是Boolean就表示统计完成了,那么就发送消息给客户端.如果是Map,那么这个就是某一个map/reduce的结果,那么就把这个结果聚合到最终的结果中去.

WCMapReduceActor.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class WCMapReduceActor extends UntypedActor{private ActorRef mapRouter;private ActorRef aggregateActor;@Overridepublic void preStart() throws Exception {System.out.println("启动WCMapReduceActor:"+Thread.currentThread().getName());}@Overridepublic void onReceive(Object message) throws Exception {if (message instanceof String) {/*如果接收到的是显示结果的请求,那么就调用reduce的Actor*/if (((String) message).compareTo("DISPLAY_LIST") == 0) {System.out.println("Got Display Message");aggregateActor.tell(message, getSender());}if (message.equals("EOF")){//表示发送完毕aggregateActor.tell(true, getSender());}else {/*否则给map的Actor进行计算*/mapRouter.tell(message,null);}}}public WCMapReduceActor(ActorRef inAggregateActor, ActorRef inMapRouter) {mapRouter = inMapRouter;aggregateActor = inAggregateActor;}
}

消息中转和统管的Actor,它统管了其他的几个Actor,是消息的入口.

WCMapReduceServer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class WCMapReduceServer{private ActorRef mapRouter;private ActorRef reduceRouter;private ActorRef aggregateActor;private ActorRef wcMapReduceActor;public WCMapReduceServer(int no_of_reduce_workers, int no_of_map_workers) {/*创建了Actor系统*/ActorSystem system = ActorSystem.create("WCMapReduceApp", ConfigFactory.load("application").getConfig("WCMapReduceApp"));// 创建聚合ActoraggregateActor = system.actorOf(Props.create(AggregateActor.class));// 创建多个聚合的ActorreduceRouter = system.actorOf(Props.create(ReduceActor.class,aggregateActor).withRouter(new RoundRobinPool(no_of_reduce_workers)));// 创建多个Map的ActormapRouter = system.actorOf(Props.create(MapActor.class,reduceRouter).withRouter(new RoundRobinPool(no_of_map_workers)));// create the overall WCMapReduce Actor that acts as the remote actor// for clientsProps props = Props.create(WCMapReduceActor.class,aggregateActor,mapRouter).withDispatcher("priorityMailBox-dispatcher");wcMapReduceActor = system.actorOf(props, "WCMapReduceActor");}/*** @param args*/public static void main(String[] args) {new WCMapReduceServer(50, 50);}}

服务端的入口程序,定义了一个50个actor的单词统计服务.并使用轮询模式来分发客服端接收到的统计任务.

以上就是整个DEMO的所有的代码.当执行这个程序后,会在控制台打印:

服务器端:

1
2
Got Display Message
{minerals=5, half=35, exceeding=5, spoke=25, mince=5, hall=5, disproportion=5, youth=20, guards=5, wreck=5, begins=10, approved=20, imperfect=5, drunk=15, framed=10, pick=5,......}

客户端:

1
2
3
4
All lines send !
聚合完成ClientActor estimate:       Calculation time:   467 MS

到此,我们就实现了通过AKKA-remoting 来进行map/reduce的简单计算.

Cluster调用

原理

Akka除了remoting远程调用外,还提供了支持去中心化的基于P2P的集群服务,并且不会出现单点故障.Akka的集群是基于Gossip协议实现的,支持服务自动失效检测,能够自动发现出现问题而离开集群的成员节点,通过事件驱动的方式,将状态传播到整个集群的其他成员节点中去.Gossip协议是点对点通信协议的一种,它受社交网络中的流言传播的特点所启发,解决了在超大规模集群下其他方式无法解决的单点等问题.

一个Akka集群是由一组成员节点组成的,每一个成员节点都是通过hostname:port:uid来唯一标识,并且每一个成员节点间是完全解耦合的.

节点状态

Akka集群内部为集群中的成员定义了6种状态,并提供了状态转换矩阵,这6种状态分别是:

  • Joining : 正在加入集群的状态
  • Up : 正常提供服务的状态
  • Leaving : 正在离开集群的状态
  • Down : 节点服务下线的状态
  • Exiting : 节点离开状态
  • Removed : 节点被删除状态

在Akka集群中的每一个成员节点,都只可能处在这6种状态中的一种中.当节点状态发生变化的时候,会发出节点状态事件.需要注意的是,除了Down和Removed状态外,其他状态是有可能随时变为Down状态的,即节点故障而无法提供服务.处于Down状态的节点如果想要再次加入Akka集群中,需要重新启动,并加入Joining状态,然后才能进行后续状态的变化,加入集群.

故障监控

在Akka集群中,集群的每一个成员节点,都会被其他另外一组节点(默认是5个)所监控,这一组节点会通过心跳来检测被监控的节点是否处于Unreachable状态,如果不可达则这一组节点会将被监控的节点的Unreachable状态向集群中的其他所有节点传播,最终使集群中的每个成员节点都知道被监控的节点已经故障.

Akka集群中任一一个成员节点都有可能成为集群的Leader,这是基于Gossip协议收敛过程得到的确定性结果,并不是通过选举产生,从而避免了单点故障.在Akka集群中,Leader只是一种角色,在各轮Gossip收敛过程中Leader可能是不断变化的.Leader的职责就是让成员节点加入和离开集群.一个成员节点最开始是处于Joining状态,一旦所有其他节点都看到了新加入的该节点,则Leader会设置这个节点的状态为up.如果一个节点安全离开Akka集群,那么这个节点的状态会变为Leaving状态,当Leader看到该节点为Leaving状态,会将其状态修改为Exiting,然后通知所有其他节点,当所有节点看到该节点状态为exiting后,Leader将该节点移除,状态修改为removed状态.

配置

要在项目中使用Akka集群,首先需要的就是在项目的Maven中引入akka-Cluster:

1
2
3
4
5
<dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-cluster_2.11</artifactId><version>2.4.1</version></dependency>

然后在application.conf中配置必要的参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
akka {actor {#表示Actor的提供者是Cluster集群provider = "akka.cluster.ClusterActorRefProvider"}#同akka-remoting,表示远程协议是什么remote {log-remote-lifecycle-events = offnetty.tcp {hostname = "127.0.0.1"port = 0}}#集群特有的配置cluster {#种子节点seed-nodes = ["akka.tcp://ClusterSystem@127.0.0.1:2551","akka.tcp://ClusterSystem@127.0.0.1:2552"]#自动down掉不可达的成员节点auto-down-unreachable-after = 10s}
}

上面的配置需要特别注意的就是种子节点,种子节点是akka集群中的一种特殊的节点角色.
种子节点最主要的用处是提供Cluster的初始化和加入点,同时也为其他节点作为中间联系人.要启动Akka-Cluster就必须配置一些列的种子节点.这些种子节点就是一开始就能够预料到的节点,有节点加入的时候,会等种子节点的返回确认才算是加入成功.

更多的集群配置请参见:config-akka-cluster

集群事件

正如上文所说,当节点发生变化的时候,Leader会发送状态的事件给集群中的所有成员节点.因此,接收和处理这些事件也是非常重要的.

  • MemberUp : 成员节点上线
  • MemberExited : 成员节点下线
  • MemberRemoved : 成员节点被剔除
  • UnreachableMember : 成员节点无法到达
  • ReachableMember : 成员节点可到达
  • LeaderChanged : Leader变化
  • RoleLeaderChanged : 角色Leader变化
  • ClusterShuttingDown : 集群关闭
  • ClusterMetricsChanged :

要说明这些节点的变化可以参考官方给出的最最简单的Akka集群的Demo,它不仅列出了一个最简单的Akka的集群要如何构建,也说明了这几个事件状态的变化.

集群事件Demo

在官方的文档中,编写了一个最简单的Akka-Cluster的例子,这个例子就是启动三个Akka的节点,并且监听了节点的所有事件,接收到事件后,打印出来.

demo6.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
akka {actor {provider = "akka.cluster.ClusterActorRefProvider"}remote {log-remote-lifecycle-events = offnetty.tcp {hostname = "127.0.0.1"port = 0}}cluster {seed-nodes = [#先启动了两个种子节点,这样当有新的节点加入时,会把事件通知给这两个节点."akka.tcp://ClusterSystem@127.0.0.1:2551","akka.tcp://ClusterSystem@127.0.0.1:2552"]auto-down-unreachable-after = 10s}
}

SimpleClusterListener.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class SimpleClusterListener extends UntypedActor {/*记录日志*/LoggingAdapter log = Logging.getLogger(getContext().system(), this);/*创建,获取集群*/Cluster cluster = Cluster.get(getContext().system());//订阅集群中的事件@Overridepublic void preStart() {//region subscribecluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(),MemberEvent.class, UnreachableMember.class);//endregion}//re-subscribe when restart@Overridepublic void postStop() {cluster.unsubscribe(getSelf());}@Overridepublic void onReceive(Object message) {/*当接收到不同的事件的时候,打印出不同的信息*/if (message instanceof MemberUp) {MemberUp mUp = (MemberUp) message;log.info("Member is Up: {}", mUp.member());} else if (message instanceof UnreachableMember) {UnreachableMember mUnreachable = (UnreachableMember) message;log.info("Member detected as unreachable: {}", mUnreachable.member());} else if (message instanceof MemberRemoved) {MemberRemoved mRemoved = (MemberRemoved) message;log.info("Member is Removed: {}", mRemoved.member());} else if (message instanceof MemberEvent) {// ignorelog.info("Member Event: {}", ((MemberEvent) message).member());} else {unhandled(message);}}
}

SimpleClusterApp.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class SimpleClusterApp {public static void main(String[] args) {if (args.length == 0)/*启动三个节点*/startup(new String[]{"2551", "2552", "0"});elsestartup(args);}public static void startup(String[] ports) {for (String port : ports) {// 重写配置中的远程端口Config config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).withFallback(ConfigFactory.load("demo6"));// 创建ActorSystem,名称需要和conf配置文件中的相同ActorSystem system = ActorSystem.create("ClusterSystem", config);// 创建集群中的Actor,并监听事件system.actorOf(Props.create(SimpleClusterListener.class),"clusterListener");}}
}

这个简单的例子启动了三个节点,并创建了一个Actor来监听集群中的各种事件,执行这个Demo,会在控制台打印:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
[INFO] [02/11/2016 22:14:42.051] [main] [akka.remote.Remoting] Starting remoting
[INFO] [02/11/2016 22:14:42.347] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:2551]
[INFO] [02/11/2016 22:14:42.356] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Starting up...
[INFO] [02/11/2016 22:14:42.425] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Registered cluster JMX MBean [akka:type=Cluster]
[INFO] [02/11/2016 22:14:42.425] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Started up successfully
[INFO] [02/11/2016 22:14:42.429] [ClusterSystem-akka.actor.default-dispatcher-14] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate platform-specific native libary to 'java.library.path'. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
[INFO] [02/11/2016 22:14:42.432] [ClusterSystem-akka.actor.default-dispatcher-14] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Metrics collection has started successfully
[INFO] [02/11/2016 22:14:42.454] [main] [akka.remote.Remoting] Starting remoting
[INFO] [02/11/2016 22:14:42.460] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:2552]
[INFO] [02/11/2016 22:14:42.461] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Starting up...
[INFO] [02/11/2016 22:14:42.464] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Started up successfully
[INFO] [02/11/2016 22:14:42.464] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate platform-specific native libary to 'java.library.path'. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
[INFO] [02/11/2016 22:14:42.464] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Metrics collection has started successfully
[INFO] [02/11/2016 22:14:42.486] [main] [akka.remote.Remoting] Starting remoting
[INFO] [02/11/2016 22:14:42.491] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:57726]
[INFO] [02/11/2016 22:14:42.492] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:57726] - Starting up...
[INFO] [02/11/2016 22:14:42.494] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:57726] - Started up successfully
[INFO] [02/11/2016 22:14:42.494] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:57726] - Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate platform-specific native libary to 'java.library.path'. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
[INFO] [02/11/2016 22:14:42.494] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:57726] - Metrics collection has started successfully
[INFO] [02/11/2016 22:14:42.645] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Node [akka.tcp://ClusterSystem@127.0.0.1:2551] is JOINING, roles []
[INFO] [02/11/2016 22:14:42.665] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:2551] to [Up]
[INFO] [02/11/2016 22:14:42.672] [ClusterSystem-akka.actor.default-dispatcher-16] [akka://ClusterSystem/user/clusterListener] Member is Up: Member(address = akka.tcp://ClusterSystem@127.0.0.1:2551, status = Up)
[INFO] [02/11/2016 22:14:47.662] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Node [akka.tcp://ClusterSystem@127.0.0.1:2552] is JOINING, roles []
[INFO] [02/11/2016 22:14:47.662] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Node [akka.tcp://ClusterSystem@127.0.0.1:57726] is JOINING, roles []
[INFO] [02/11/2016 22:14:47.665] [ClusterSystem-akka.actor.default-dispatcher-18] [akka://ClusterSystem/user/clusterListener] Member Event: Member(address = akka.tcp://ClusterSystem@127.0.0.1:2552, status = Joining)
[INFO] [02/11/2016 22:14:47.666] [ClusterSystem-akka.actor.default-dispatcher-4] [akka://ClusterSystem/user/clusterListener] Member Event: Member(address = akka.tcp://ClusterSystem@127.0.0.1:57726, status = Joining)
[INFO] [02/11/2016 22:14:47.731] [ClusterSystem-akka.actor.default-dispatcher-21] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:57726] - Welcome from [akka.tcp://ClusterSystem@127.0.0.1:2551]
[INFO] [02/11/2016 22:14:47.731] [ClusterSystem-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Welcome from [akka.tcp://ClusterSystem@127.0.0.1:2551]
[INFO] [02/11/2016 22:14:47.731] [ClusterSystem-akka.actor.default-dispatcher-2] [akka://ClusterSystem/user/clusterListener] Member is Up: Member(address = akka.tcp://ClusterSystem@127.0.0.1:2551, status = Up)
[INFO] [02/11/2016 22:14:47.732] [ClusterSystem-akka.actor.default-dispatcher-2] [akka://ClusterSystem/user/clusterListener] Member Event: Member(address = akka.tcp://ClusterSystem@127.0.0.1:2552, status = Joining)
[INFO] [02/11/2016 22:14:47.732] [ClusterSystem-akka.actor.default-dispatcher-2] [akka://ClusterSystem/user/clusterListener] Member Event: Member(address = akka.tcp://ClusterSystem@127.0.0.1:57726, status = Joining)
[INFO] [02/11/2016 22:14:47.733] [ClusterSystem-akka.actor.default-dispatcher-5] [akka://ClusterSystem/user/clusterListener] Member is Up: Member(address = akka.tcp://ClusterSystem@127.0.0.1:2551, status = Up)
[INFO] [02/11/2016 22:14:47.733] [ClusterSystem-akka.actor.default-dispatcher-5] [akka://ClusterSystem/user/clusterListener] Member Event: Member(address = akka.tcp://ClusterSystem@127.0.0.1:2552, status = Joining)
[INFO] [02/11/2016 22:14:47.745] [ClusterSystem-akka.actor.default-dispatcher-18] [akka://ClusterSystem/user/clusterListener] Member Event: Member(address = akka.tcp://ClusterSystem@127.0.0.1:57726, status = Joining)
[INFO] [02/11/2016 22:14:48.464] [ClusterSystem-akka.actor.default-dispatcher-15] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:2552] to [Up]
[INFO] [02/11/2016 22:14:48.464] [ClusterSystem-akka.actor.default-dispatcher-15] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:57726] to [Up]
[INFO] [02/11/2016 22:14:48.465] [ClusterSystem-akka.actor.default-dispatcher-15] [akka://ClusterSystem/user/clusterListener] Member is Up: Member(address = akka.tcp://ClusterSystem@127.0.0.1:2552, status = Up)
[INFO] [02/11/2016 22:14:48.465] [ClusterSystem-akka.actor.default-dispatcher-15] [akka://ClusterSystem/user/clusterListener] Member is Up: Member(address = akka.tcp://ClusterSystem@127.0.0.1:57726, status = Up)
[INFO] [02/11/2016 22:14:49.463] [ClusterSystem-akka.actor.default-dispatcher-16] [akka://ClusterSystem/user/clusterListener] Member is Up: Member(address = akka.tcp://ClusterSystem@127.0.0.1:2552, status = Up)
[INFO] [02/11/2016 22:14:49.463] [ClusterSystem-akka.actor.default-dispatcher-16] [akka://ClusterSystem/user/clusterListener] Member is Up: Member(address = akka.tcp://ClusterSystem@127.0.0.1:57726, status = Up)
[INFO] [02/11/2016 22:14:49.481] [ClusterSystem-akka.actor.default-dispatcher-21] [akka://ClusterSystem/user/clusterListener] Member is Up: Member(address = akka.tcp://ClusterSystem@127.0.0.1:2552, status = Up)
[INFO] [02/11/2016 22:14:49.481] [ClusterSystem-akka.actor.default-dispatcher-21] [akka://ClusterSystem/user/clusterListener] Member is Up: Member(address = akka.tcp://ClusterSystem@127.0.0.1:57726, status = Up)

从日志中就可以看出各种状态的变化

集群实践

阶乘服务Demo

这里通过一个简单的阶乘计算,来展示Akka-Cluster的使用.
这个Demo分为了前台和后台两个部分,前台只用来输入阶乘的大小以及打印计算的结果,后台节点负责真正的阶乘的计算.

demo7.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
include "demo6"# //#min-nr-of-members
akka.cluster.min-nr-of-members = 3
# //#min-nr-of-members# //#role-min-nr-of-members
akka.cluster.role {frontend.min-nr-of-members = 1backend.min-nr-of-members = 2
}
# //#role-min-nr-of-members# //#adaptive-router
akka.actor.deployment {/factorialFrontend/factorialBackendRouter = {router = adaptive-group# metrics-selector = heap# metrics-selector = load# metrics-selector = cpumetrics-selector = mixnr-of-instances = 100routees.paths = ["/user/factorialBackend"]cluster {enabled = onuse-role = backendallow-local-routees = off}}
}
# //#adaptive-router

FactorialResult.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class FactorialResult implements Serializable {public final int n;public final BigInteger factorial;FactorialResult(int n, BigInteger factorial) {this.n = n;this.factorial = factorial;}@Overridepublic String toString() {return "FactorialResult{" +"n=" + n +", factorial=" + factorial +'}';}
}

FactorialBackend.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class FactorialBackend extends UntypedActor {@Overridepublic void onReceive(Object message) {//如果是数字if (message instanceof Integer) {final Integer n = (Integer) message;/*使用akka的future功能,异步的计算阶乘*/Future<BigInteger> f = future(() -> factorial(n), getContext().dispatcher());/*合并计算的结果*/Future<FactorialResult> result = f.map(new Mapper<BigInteger, FactorialResult>() {public FactorialResult apply(BigInteger factorial) {return new FactorialResult(n, factorial);}}, getContext().dispatcher());/*把结果返回Sender*/pipe(result, getContext().dispatcher()).to(getSender());} else {unhandled(message);}}/*** 进行阶乘计算* @param n* @return*/BigInteger factorial(int n) {BigInteger acc = BigInteger.ONE;for (int i = 1; i <= n; ++i) {acc = acc.multiply(BigInteger.valueOf(i));}return acc;}
}

FactorialBackendMain.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class FactorialBackendMain {public static void main(String[] args) {// 重写配置文件中的集群角色和端口final String port = args.length > 0 ? args[0] : "0";final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")).withFallback(ConfigFactory.load("demo7"));ActorSystem system = ActorSystem.create("ClusterSystem", config);system.actorOf(Props.create(FactorialBackend.class), "factorialBackend");}}

FactorialFrontend.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class FactorialFrontend extends UntypedActor {final int upToN;        //计算到多少final boolean repeat;       //是否重复计算LoggingAdapter log = Logging.getLogger(getContext().system(), this);/*获取到Backend的Router*/ActorRef backend = getContext().actorOf(FromConfig.getInstance().props(),"factorialBackendRouter");public FactorialFrontend(int upToN, boolean repeat) {this.upToN = upToN;this.repeat = repeat;}@Overridepublic void preStart() {//因为是在Start前就发送消息,所以必定超时.sendJobs();getContext().setReceiveTimeout(Duration.create(10, TimeUnit.SECONDS));}@Overridepublic void onReceive(Object message) {if (message instanceof FactorialResult) {FactorialResult result = (FactorialResult) message;if (result.n == upToN) {System.out.println("计算的结果:" + result);if (repeat)sendJobs();elsegetContext().stop(getSelf());}} else if (message instanceof ReceiveTimeout) {log.info("Timeout");sendJobs();} else {unhandled(message);}}void sendJobs() {log.info("Starting batch of factorials up to [{}]", upToN);for (int n = 1; n <= upToN; n++) {backend.tell(n, getSelf());}}}

FactorialFrontendMain.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class FactorialFrontendMain {public static void main(String[] args) {final int upToN = 10;final Config config = ConfigFactory.parseString("akka.cluster.roles = [frontend]").withFallback(ConfigFactory.load("demo7"));final ActorSystem system = ActorSystem.create("ClusterSystem", config);system.log().info("Factorials will start when 2 backend members in the cluster.");//#registerOnUpCluster.get(system).registerOnMemberUp((Runnable) () -> system.actorOf(Props.create(FactorialFrontend.class, upToN, false),"factorialFrontend"));//#registerOnUp}}

FactorialApp.java

1
2
3
4
5
6
7
8
9
10
public class FactorialApp {public static void main(String[] args) {// starting 3 backend nodes and 1 frontend nodeFactorialBackendMain.main(new String[] { "2551" });FactorialBackendMain.main(new String[] { "2552" });FactorialBackendMain.main(new String[0]);FactorialFrontendMain.main(new String[0]);}
}

通过执行FactorialApp.java可以启动整个演示. 它创建了3个后台节点和一个前台节点.
前台节点启动后,创建FactorialFrontendActor,这个Actor负责发送计算数给后台节点,以及接受计算的结果并打印出来.后台节点的ActorFactorialBackend负责计算阶乘,并返回结果.

执行这个DEMO后,控制台会打印:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
...
...
[INFO] [02/11/2016 22:45:04.580] [ClusterSystem-akka.actor.default-dispatcher-17] [akka://ClusterSystem/user/factorialFrontend] Starting batch of factorials up to [10]
[INFO] [02/11/2016 22:45:04.583] [ClusterSystem-akka.actor.default-dispatcher-2] [akka://ClusterSystem/deadLetters] Message [java.lang.Integer] from Actor[akka://ClusterSystem/user/factorialFrontend#1317512767] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [02/11/2016 22:45:04.583] [ClusterSystem-akka.actor.default-dispatcher-2] [akka://ClusterSystem/deadLetters] Message [java.lang.Integer] from Actor[akka://ClusterSystem/user/factorialFrontend#1317512767] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [02/11/2016 22:45:04.583] [ClusterSystem-akka.actor.default-dispatcher-2] [akka://ClusterSystem/deadLetters] Message [java.lang.Integer] from Actor[akka://ClusterSystem/user/factorialFrontend#1317512767] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [02/11/2016 22:45:04.583] [ClusterSystem-akka.actor.default-dispatcher-2] [akka://ClusterSystem/deadLetters] Message [java.lang.Integer] from Actor[akka://ClusterSystem/user/factorialFrontend#1317512767] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [02/11/2016 22:45:04.583] [ClusterSystem-akka.actor.default-dispatcher-2] [akka://ClusterSystem/deadLetters] Message [java.lang.Integer] from Actor[akka://ClusterSystem/user/factorialFrontend#1317512767] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [02/11/2016 22:45:04.583] [ClusterSystem-akka.actor.default-dispatcher-2] [akka://ClusterSystem/deadLetters] Message [java.lang.Integer] from Actor[akka://ClusterSystem/user/factorialFrontend#1317512767] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [6] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [02/11/2016 22:45:04.584] [ClusterSystem-akka.actor.default-dispatcher-2] [akka://ClusterSystem/deadLetters] Message [java.lang.Integer] from Actor[akka://ClusterSystem/user/factorialFrontend#1317512767] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [7] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [02/11/2016 22:45:04.584] [ClusterSystem-akka.actor.default-dispatcher-2] [akka://ClusterSystem/deadLetters] Message [java.lang.Integer] from Actor[akka://ClusterSystem/user/factorialFrontend#1317512767] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [8] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [02/11/2016 22:45:04.584] [ClusterSystem-akka.actor.default-dispatcher-2] [akka://ClusterSystem/deadLetters] Message [java.lang.Integer] from Actor[akka://ClusterSystem/user/factorialFrontend#1317512767] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [9] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [02/11/2016 22:45:04.584] [ClusterSystem-akka.actor.default-dispatcher-2] [akka://ClusterSystem/deadLetters] Message [java.lang.Integer] from Actor[akka://ClusterSystem/user/factorialFrontend#1317512767] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [10] dead letters encountered, no more dead letters will be logged. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [02/11/2016 22:45:14.600] [ClusterSystem-akka.actor.default-dispatcher-7] [akka://ClusterSystem/user/factorialFrontend] Timeout
[INFO] [02/11/2016 22:45:14.601] [ClusterSystem-akka.actor.default-dispatcher-7] [akka://ClusterSystem/user/factorialFrontend] Starting batch of factorials up to [10]
[WARN] [02/11/2016 22:45:14.607] [ClusterSystem-akka.remote.default-remote-dispatcher-8] [akka.serialization.Serialization(akka://ClusterSystem)] Using the default Java serializer for class [java.lang.Integer] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting 'akka.actor.warn-about-java-serializer-usage'
[WARN] [02/11/2016 22:45:14.622] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka.serialization.Serialization(akka://ClusterSystem)] Using the default Java serializer for class [cn.sunxiang0918.akka.demo7.FactorialResult] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting 'akka.actor.warn-about-java-serializer-usage'
[WARN] [02/11/2016 22:45:14.622] [ClusterSystem-akka.remote.default-remote-dispatcher-6] [akka.serialization.Serialization(akka://ClusterSystem)] Using the default Java serializer for class [cn.sunxiang0918.akka.demo7.FactorialResult] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting 'akka.actor.warn-about-java-serializer-usage'
[WARN] [02/11/2016 22:45:14.622] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka.serialization.Serialization(akka://ClusterSystem)] Using the default Java serializer for class [cn.sunxiang0918.akka.demo7.FactorialResult] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting 'akka.actor.warn-about-java-serializer-usage'
计算的结果:FactorialResult{n=10, factorial=3628800}

转载于:https://my.oschina.net/xiaohui249/blog/829884

Akka in JAVA(三)相关推荐

  1. akka for java

    目前火热的两大计算框架 Spark 和 Flink 底层的通讯原理使用的都是 akka(当前的 Spark 已经背叛了akka,转投 netty),而网上对 akka 的教程是在太少,推荐 githu ...

  2. 学习java三个技巧要知道!

    java一直是IT行业发展前景非常不错的一门编程语言,学起来是相对有点困难的,尤其是零基础学员,要想学好java技术,一定要知道这三个技巧,来看看下面的详细介绍就知道了. 学习java三个技巧要知道! ...

  3. java同步异步调用_详解java 三种调用机制(同步、回调、异步)

    1:同步调用:一种阻塞式调用,调用方要等待对方执行完毕才返回,jsPwwCe它是一种单向调用 2:回调:一种双向调用模式,也就是说,被调用方在接口被调用时也会调用对方的接口: 3:异步调用:一种类似消 ...

  4. java 三种将list转换为map的方法详解

    这篇文章主要介绍了java 三种将list转换为map的方法详解的相关资料,需要的朋友可以参考下 java 三种将list转换为map的方法详解 在本文中,介绍三种将list转换为map的方法: 1) ...

  5. JAVA 三种线程实现创建方式

    JAVA 三种线程实现/创建方式 方式一:继承Thread类 通过继承Thread类来创建一个自定义线程类.Thread类本质上就继承了Runable接口,代表一个线程类.启动线程的唯一办法就是通过T ...

  6. redis 什么是冷数据_阿里Java三面凉凉:微服务,Redis,JVM一个都搞不懂

    前言: 金九银十刚刚过去了,不知道很多小伙伴都拿到自己心仪的offer没有,我这边也收到了一个粉丝投来的消息,说看到阿里的面试真题之后人都是懵的,发现自己一窍不通,下面给大家分享我这个粉丝的经历,以及 ...

  7. 百度高级Java三面题目!涵盖JVM +Java锁+分布式等

    百度高级Java一面 自我介绍 对象相等的判断,equals方法实现. Hashcode的作用,与 equal 有什么区别? Java中CAS算法? G1回收器讲一下? HashMap,Concurr ...

  8. 2019 最新蚂蚁花呗Java三面题目:红黑树+并发容器+CAS+Solr+分布式等

    蚂蚁金服专场 涵盖了蚂蚁金服从Java工程师到技术专家面试题目 支付宝高级Java三面题目:线程锁+事务+雪崩+Docker等 蚂蚁花呗团队面试题:LinkedHashMap+SpringCloud+ ...

  9. 牛逼!支付宝高级Java三面题目:线程锁+事务+雪崩+Docker等

    支付宝高级Java一面 JVM中的老年代在什么情况下会触发GC? CMS的垃圾回收步骤,G1和CMS的区别? CMS哪个阶段是并发的,哪个阶段是串行的? 谈谈Java线程池,线程池中几个参数含义 谈谈 ...

最新文章

  1. jquery中在子窗口中获取父窗口的Input文本值
  2. Linux系统安装gcc/g++详细过程
  3. python创建一个类初始化两个变量name、age_Python小白入门:第八讲||类
  4. Python网络编程(OSI模型、网络协议、TCP)
  5. 2016.3.22(关系型数据库简介,管理数据库和表)
  6. jQuery选择器和选取方法
  7. java实现HTTP请求的三种方式
  8. 私人飞机包机运营商flyExclusive通过与BitPay合作支持加密货币付款
  9. vsn服务器 需要先建项目才能,急求!项目马上上线,但是Oracle服务器存在问题...
  10. 亲身体验 DDOS(拒绝服务)攻击硬防DIY
  11. 《NPDP 产品经理认证知识体系指南》读书笔记
  12. java SE复习笔记61
  13. 十大语音搜索应用服务(以歌搜歌)
  14. Mac清空的废纸篓文件怎么恢复
  15. Vue 源码之 mixin 原理
  16. 股票开户须知,天津证券公司线上开户佣金一般是多少?
  17. sqli-labs第一关和第二关
  18. LiteOS-M内核
  19. 4.GitHub译文之社区
  20. Windows Terminal美化:oh-my-posh配置记录

热门文章

  1. 64位ubuntu安装WPS
  2. jQuery Template的用法
  3. 我的微软最有价值专家(Microsoft MVP)之路
  4. Visual Studio 6.0编译PWLib1.12总结
  5. 网络工程师应掌握的50个路由器知识要点
  6. 与有利集团总公司,同携手•共辉煌
  7. 绝对经典的滑轮新闻显示(javascript+css)实现
  8. 【Python自学笔记】10个爬虫入门实例,附源码与注释
  9. linux双机脚本pkg如何生效,linux里命令pkg config工具的使用
  10. android编程fragment,Android中关于FragmentA嵌套FragmentB的问题