akka for java
目前火热的两大计算框架 Spark 和 Flink 底层的通讯原理使用的都是 akka(当前的 Spark 已经背叛了akka,转投 netty),而网上对 akka 的教程是在太少,推荐 github 上的 《Akka 中文指南》,这是为数不多的参考教程,但是它的入门案例属实劝退,因此再看了一部分后总结出我认为比较好入门的知识点组成了这篇文章,算是对《Akka 中文指南》食用前的开胃菜。如果您对它的Java快速入门很感兴趣,且能很好地理解它的代码,那么这篇文章你可以跳过去了。
Akka 是一个用 Scala 编写的库,用于在 JVM 平台上简化编写具有可容错的、高可伸缩性的 Java 和 Scala 的 Actor 模型应用,其同时提供了Java 和 Scala 的开发接口。Akka 允许我们专注于满足业务需求,而不是编写初级代码。在 Akka 中,Actor 之间通信的唯一机制就是消息传递。Akka 对 Actor 模型的使用提供了一个抽象级别,使得编写正确的并发、并行和分布式系统更加容易。Actor 模型贯穿了整个 Akka 库,为我们提供了一致的理解和使用它们的方法。Akka 主要解决的问题是:可以轻松的写出高效稳定的并发程序,程序员不再过多的考虑线程、锁和资源竞争等细节。
一、Akka 必会概念
1.1 Actor 中 Actor 模型
- Akka 处理并发的方式基于 Actor 模型,如上图
- 在基于 Actor 的系统里,所有的事物都是 Actor,就好像面向对象设计里面一切皆对象
- Actor 模型是作为一个并发模型,Actor 与 Actor 之间只能通过消息进行通信,如图信封
- Actor 向 Actor 发送消息时必须获取对象的引用即:ActorRef,就好像张三给李四打电话必须知道李四的电话号码一样
- Actor 向 Actor 发送的消息并不是直接给到 Actor 的,而是统一发送到对应 Actor 内部的 Mailbox(内部封装,用户不可见),由 Mailbox 转发给 Actor
- Mailbox 可以识别出每个消息的发送者
- 当一个 Actor 给另外一个 Actor 发消息,消息是有序的,多个 Actor 之间顺序不保证
- 发送消息的 Actor 可以等待消息的响应也可以异步处理
- ActorSystem 的职责是负责并管理其创建的 Actor,一个进程中的 ActorSystem 是单例的,Actor 可以有多个
1.2 Actor 引用
即 ActorRef,最重要的功能就是是通过 ActorRef 进行发消息,每个 Actor 可以通过getSelf()
获取自身的引用,也可以在消息处理阶段通过 getSender()
获取发送消息的 Actor 引用
根据 ActorSystem 的配置,支持不同类型的 Actor 引用
- 纯本地 Actor 引用:未配置网络功能的 ActorSystem 使用。
- 远程 Actor 引用:支持网络功能的 ActorSystem 使用,其引用包含协议和远程寻址信息
- 特殊 Actor 引用:DeadLetterActorRef(死信)、EmptyLocalActorRef(查找不存在的本地引用是返回)
1.3 Actor 路径
类似 Zookeeper 的结构,ActorSystem 存在一个根目录,其名称为 /,下一级包括:
- /user:所有用户创建的 Actor 的顶级 Actor,即我们创建的 Actor 都在 /user 下
- /system:所有系统创建的 Actor 的顶级 Actor,如系统的日志监听器
- /deadletters:死信 Actor,即所有发送到已停止或不存在的 Actor 的消息都会重新路由这里
- /tmp:所有短期系统创建的 Actor 的守护者 Actor
- /remote:一个人工路径,其下面所有 Actor 的监督者都是远程 Actor 引用
二、Akka for Java
引入依赖 pom.xml
<project><modelVersion>4.0.0</modelVersion><groupId>hello-akka-java</groupId><artifactId>app</artifactId><version>1.0</version><properties><akka.version>2.6.19</akka.version></properties><dependencies><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-actor_2.12</artifactId><version>${akka.version}</version></dependency></dependencies>
</project>
从上面的描述创建一个入门的 akka 程序基本步骤如下:
- 创建 ActorSystem
- 创建若干个 Actor
- Actor 之间发消息与接收消息的逻辑处理
2.1 ActorSystem
创建 ActorSystem 最简单的方式
ActorSystem system = ActorSystem.create("demo"); // 传一个系统名称
2.2 Actor
通过继承 AbstractActor 来获取,例如:创建一个 JobManager 和 TaskManager 的 Actor
JobManager
package tech.kpretty;import akka.actor.AbstractActor;/*** @author wjun* @date 2022/6/19 16:17* @email wjunjobs@outlook.com* @describe */
public class JobManager extends AbstractActor {/*** 重写消息接收的方法** @return Receive,封装了对不同消息的处理逻辑*/@Overridepublic Receive createReceive() {// 对所有的消息不做任何响应return receiveBuilder().build();}
}
TaskManager
package tech.kpretty;import akka.actor.AbstractActor;/*** @author wjun* @date 2022/6/19 16:17* @email wjunjobs@outlook.com* @describe*/
public class TaskManager extends AbstractActor {/*** 重写消息接收的方法** @return Receive,封装了对不同消息的处理逻辑*/@Overridepublic Receive createReceive() {// 对所有的消息不做任何响应return receiveBuilder().build();}
}
1.3 tell & receive
上面说过发送消息需要获取 ActorRef,通常有两种方式:创建的时候会返回 ActorRef、通过 ActorSystem 给定路劲搜索获取 ActorRef,即:一个是针对不存在的 Actor 需要创建,一个是针对已经存在的 Actor。
方式一:创建后返回 ActorRef
// ActorSystem
def actorOf(props: Props, name: String): ActorRef
需要一个 Props 配置类,用于指定创建 Actor 的选项,例如:
package tech.kpretty;import akka.actor.ActorSystem;
import akka.actor.Props;/*** @author wjun* @date 2022/6/19 16:20* @email wjunjobs@outlook.com* @describe 测试通信的入口*/
public class ApplicationMaster {public static void main(String[] args) {ActorSystem system = ActorSystem.create("demo");system.actorOf(Props.create(JobManager.class, () -> new JobManager()));}
}
注:这是一个非常危险的方式,破坏了 Actor 的封装性,因此建立使用下面的方式
每个 Actor 给出自己的静态 Props 创建方法,例如 JobManager
package tech.kpretty;import akka.actor.AbstractActor;
import akka.actor.Props;/*** @author wjun* @date 2022/6/19 16:17* @email wjunjobs@outlook.com* @describe*/
public class JobManager extends AbstractActor {// 提供 Props 的静态方法,同时还可以传入一些参数,作为实例化 JobManager 的参数static Props props() {return Props.create(JobManager.class, JobManager::new);}/*** 重写消息接收的方法** @return Receive,封装了对不同消息的处理逻辑*/@Overridepublic Receive createReceive() {// 对所有的消息不做任何响应return receiveBuilder().build();}
}
修改 ApplicationMaster
package tech.kpretty;import akka.actor.ActorRef;
import akka.actor.ActorSystem;/*** @author wjun* @date 2022/6/19 16:20* @email wjunjobs@outlook.com* @describe 测试通信的入口*/
public class ApplicationMaster {public static void main(String[] args) {ActorSystem system = ActorSystem.create("demo");ActorRef jobManager = system.actorOf(JobManager.props(), "jobManager");// 打印 actor pathSystem.out.println(jobManager);}
}
下面编写 JobManager 接收消息的逻辑,例如:当接收到 init 时,打印 hello xxx,我是 xxx,分别打印发送者的名字和自己的名字,重写 createReceive 即可
/*** 重写消息接收的方法** @return Receive,封装了对不同消息的处理逻辑*/
@Override
public Receive createReceive() {// 对所有的消息不做任何响应return receiveBuilder().matchEquals("init", message -> System.out.printf("hello %s,I'm %s%n", sender().path().name(), self().path().name())).build();
}
通过 ActorRef.tell(message,ActorRef) 发送信息,其中 ActorRef 即为消息的发送者,我们这里是独立于 Akka 系统之外的角色来给 JobManager 发送消息,因此可以使用 ActorRef.noSender 表示没有 Actor
package tech.kpretty;import akka.actor.ActorRef;
import akka.actor.ActorSystem;/*** @author wjun* @date 2022/6/19 16:20* @email wjunjobs@outlook.com* @describe 测试通信的入口*/
public class ApplicationMaster {public static void main(String[] args) {ActorSystem system = ActorSystem.create("demo");ActorRef jobManager = system.actorOf(JobManager.props(), "jobManager");// 打印 actor pathSystem.out.println(jobManager);// 发送消息jobManager.tell("init",ActorRef.noSender());}
}
结果如下:
可以看出 ActorRef.noSender() 返回的是一个死信的 ActorRef
1.4 Actor 的生命周期方法
继承 AbstractActor 后可以重写:
- preStart:Actor 创建时自动异步启动
- postStop:getContext.stop(ActorRef)时调用
- preRestart:Actor 重启前调用,用于清理崩溃的数据
- postRestart:Actor 重启后调用,用于崩溃后的初始化,默认调用 preStart
三、模拟Flink心跳检测
需求是:启动 JobManager 后,发送 init 请求,JobManager 创建若干个 TaskManager,TaskManager每 5 秒向 JobManager 发送心跳检测,若 10 秒都没有接收到心跳则打印 xxx 已停止
3.1 封装消息类型
封装 init 消息携带创建 TaskManager 个数
package tech.kpretty;/*** @author wjun* @date 2022/6/19 17:10* @email wjunjobs@outlook.com* @describe*/
public class InitRequest {private final int taskManagerNumber;public InitRequest(int taskManagerNumber) {this.taskManagerNumber = taskManagerNumber;}public int getTaskManagerNumber() {return taskManagerNumber;}
}
封装 TaskManager 请求
package tech.kpretty;/*** @author wjun* @date 2022/6/19 17:26* @email wjunjobs@outlook.com* @describe*/
public class TaskManagerRequest {private final String type;private final long ts;public TaskManagerRequest(String type, long ts) {this.type = type;this.ts = ts;}public String getType() {return type;}public long getTs() {return ts;}
}
3.2 JobManager
处理 InitRequest 请求,创建 TaskManager
/*** 重写消息接收的方法** @return Receive,封装了对不同消息的处理逻辑*/
@Override
public Receive createReceive() {return receiveBuilder().match(InitRequest.class, message -> {System.out.println("开始初始化...");for (int i = 0; i < message.getTaskManagerNumber(); i++) {getContext().actorOf(TaskManager.props(), "taskManager-" + i);}}).build();
}
当 TaskManager 创建完成后发送 init 请求,JobManager 将当前 TaskManage 创建的时间保存起来
处理心跳逻辑,当 JobManager 启动时创建一个 ConcurrentHashMap 用来保存 TaskManager 的心跳时间,再启动一个线程用于定时检测
private ConcurrentHashMap<ActorRef, Long> heartbeat;
private volatile boolean isRunning = false;@Override
public void preStart() throws Exception {heartbeat = new ConcurrentHashMap<>();new Thread(() -> {while (true) {if (isRunning) checkHeartbeat();try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) {// no-op}}}).start();
}private void checkHeartbeat() {long currentTimeMillis = System.currentTimeMillis();Enumeration<ActorRef> keys = heartbeat.keys();while (keys.hasMoreElements()) {ActorRef actorRef = keys.nextElement();if (currentTimeMillis - 5000 > heartbeat.get(actorRef)) {System.out.println(actorRef + "已经挂掉了,尝试关闭它");getContext().stop(actorRef);}}
}
最终 JobManager 代码如下
package tech.kpretty;import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import scala.Option;import java.util.Enumeration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;/*** @author wjun* @date 2022/6/19 16:17* @email wjunjobs@outlook.com* @describe*/
public class JobManager extends AbstractActor {private ConcurrentHashMap<ActorRef, Long> heartbeat;private volatile boolean isRunning = false;@Overridepublic void preStart() throws Exception {heartbeat = new ConcurrentHashMap<>();new Thread(() -> {while (true) {if (isRunning) checkHeartbeat();try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) {// no-op}}}).start();}private void checkHeartbeat() {long currentTimeMillis = System.currentTimeMillis();Enumeration<ActorRef> keys = heartbeat.keys();while (keys.hasMoreElements()) {ActorRef actorRef = keys.nextElement();if (currentTimeMillis - 5000 > heartbeat.get(actorRef)) {System.out.println(actorRef + "已经挂掉了,尝试关闭它");getContext().stop(actorRef);}}}// 提供 Props 的静态方法,同时还可以传入一些参数,作为实例化 JobManager 的参数static Props props() {return Props.create(JobManager.class, JobManager::new);}/*** 重写消息接收的方法** @return Receive,封装了对不同消息的处理逻辑*/@Overridepublic Receive createReceive() {// 对所有的消息不做任何响应return receiveBuilder().match(InitRequest.class, message -> {System.out.println("开始初始化...");for (int i = 0; i < message.getTaskManagerNumber(); i++) {getContext().actorOf(TaskManager.props(), "taskManager-" + i);}}).match(TaskManagerRequest.class, message -> {if ("init".equals(message.getType())) {heartbeat.put(sender(), message.getTs());System.out.println("收到 " + sender() + " init 信息");if (!isRunning) // 只要有一个 TaskManager 启动了 就开始进行心跳检测isRunning = true;} else if ("heartbeat".equals(message.getType())) {heartbeat.put(sender(), message.getTs());System.out.println("收到 " + sender() + " heartbeat 信息");}}).matchEquals("stop", message -> {isRunning = false;getContext().stop(self());}).build();}
}
3.3 TaskManager
TaskManager 难点在于如何给 JobManager 发消息,即如何获取 JobManager 的 ActorRef,这时候需要用到获取 ActorRef 的第二种方式,对于已经存在的 Actor 可以使用 actorSelection 方法传入 Actor 的路径即可,因为 TaskManager 是 JobManager 创建的,因此它们的关系如下:
因此 TaskManager 给 JobManager 发送消息代码如下:
getContext().actorSelection(self().path().parent()).tell(new TaskManagerRequest("heartbeat", System.currentTimeMillis()), self());
TaskManager 完整代码如下:
package tech.kpretty;import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;import java.util.Enumeration;
import java.util.concurrent.TimeUnit;/*** @author wjun* @date 2022/6/19 16:17* @email wjunjobs@outlook.com* @describe*/
public class TaskManager extends AbstractActor {private volatile boolean isRunning = false;// 提供 Props 的静态方法,同时还可以传入一些参数,作为实例化 JobManager 的参数static Props props() {return Props.create(TaskManager.class, TaskManager::new);}@Overridepublic void preStart() throws Exception {System.out.println("开始启动" + self());TimeUnit.SECONDS.sleep((int) (Math.random() * 5));// 告诉 JobManager 启动好了,汇报当前时间getContext().actorSelection(self().path().parent()).tell(new TaskManagerRequest("init", System.currentTimeMillis()), self());System.out.println("启动完成" + self());isRunning = true;new Thread(() -> {while (true) {if (isRunning)sendHeartbeat();try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {// no-op}}}).start();}@Overridepublic void postStop() throws Exception {System.out.println(self() + "关闭中...");}private void sendHeartbeat() {// 告诉 JobManager 启动好了,汇报当前时间System.out.println(self() + " 发送心跳数据");getContext().actorSelection(self().path().parent()).tell(new TaskManagerRequest("heartbeat", System.currentTimeMillis()), self());}/*** 重写消息接收的方法** @return Receive,封装了对不同消息的处理逻辑*/@Overridepublic Receive createReceive() {// 对所有的消息不做任何响应return receiveBuilder().matchEquals("fail", message -> isRunning = false).build();}
}
接下来就是主函数的编写,比如发送一些消息给 JobManager、TaskManager 等
package tech.kpretty;import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;import java.util.Random;
import java.util.Scanner;/*** @author wjun* @date 2022/6/19 16:20* @email wjunjobs@outlook.com* @describe 测试通信的入口*/
public class ApplicationMaster {public static void main(String[] args) {ActorSystem system = ActorSystem.create("demo");ActorRef jobManager = system.actorOf(JobManager.props(), "jobManager");Scanner scanner = new Scanner(System.in);while (true) {System.out.print(">> ");String op = scanner.nextLine();switch (op) {case "init":jobManager.tell(new InitRequest(3), ActorRef.noSender());break;case "fail":ActorPath child = jobManager.path().child("taskManager-" + new Random().nextInt(3));system.actorSelection(child).tell("fail", ActorRef.noSender());break;case "stop":jobManager.tell("stop", ActorRef.noSender());System.exit(0);}}}
}
测试结果如下
ps: 这个案例只是为了演示 akka 运行的基本原理,案例的心跳检测存在很多 bug,作为 akka 的入门案例是比较合适的,有了 akka 的基本使用经验再去看 flink 源码会有不错的收获
akka for java相关推荐
- Akka in JAVA(三)
2019独角兽企业重金招聘Python工程师标准>>> Akka in JAVA(三) 上两个部分讲了Akka的基本知识和常见的用法.接下来讲一讲Akka的远程调用以及集群的使用.因 ...
- java akka_用于大型事件处理的Akka Java
java akka 我们正在设计一个大型的分布式事件驱动系统,用于跨事务数据库的实时数据复制. 来自源系统的数据(消息)在到达目的地之前经历了一系列转换和路由逻辑. 这些转换是多进程和多线程的操作,包 ...
- 用于大型事件处理的Akka Java
我们正在设计一个大型的分布式事件驱动系统,用于跨事务数据库的实时数据复制. 来自源系统的数据(消息)在到达目的地之前经历了一系列转换和路由逻辑. 这些转换是多进程和多线程的操作,包括可以同时执行的较小 ...
- akka java api中文_akka入门 (1):akka简介
这个akka入门系列大量参考了akka文档.主要是翻译+自己的一些理解.这里对akka文档吐一下槽,它的文档初看上去感觉挺详细的.但是很多示例代码都是片段,对初学者来说甚至都无法运行起来,所以我这里会 ...
- java akka 实战_akka集群实战
关于akka akka是JAVA虚拟机JVM平台上构建高并发.分布式和容错应用的工具包和运行时.Akka用Scala语言写成,同时提供了Scala和JAVA的开发接口. Akka处理并发的方法基于Ac ...
- Akka的好用例[关闭]
本文翻译自:Good use case for Akka [closed] I have heard lots of raving about Akka framework (Java/Scala s ...
- java 协程线程的区别_为什么 Java 坚持多线程不选择协程?
谢邀. 先说结论:协程是非常值得学习的概念,它是多任务编程的未来.但是Java全力推进这个事情的动力并不大. 先返回到问题的本源.当我们希望引入协程,我们想解决什么问题.我想不外乎下面几点:节省资源, ...
- akka---Getting Started Tutorial (Java): First Chapter
原文地址:http://doc.akka.io/docs/akka/2.0.2/intro/getting-started-first-java.html Introduction Welcome t ...
- [转] AKKA简介
[From] https://blog.csdn.net/linuxarmsummary/article/details/79399602 Akka in JAVA(一) AKKA简介 什么是AKKA ...
最新文章
- 满足极高读写性能需求的Key-Value数据库
- iOS SDWebImage 缓存机制与缓存策略
- IDA+GDB远程调试android平台 - Aarch64- elf64程序
- linux 非登录shell自动,Linux登录shell和非登录(交互式shell)环境变量配置
- spring environment_SpringBoot实战8-Spring基础-应用环境
- 八十四、SpringBoot微服务Dubbo和Zookeeper分布式
- 人工智能大地图之分布式人工智能篇
- Hadoop平台 以Parcel包安装CDH
- JAVA JDK windows环境搭建
- 2012-10-29 → 2012-11-11 周总结:项目试运行(考验的时候到了),总算解决了WCF慢的问题了...
- [Java] 蓝桥杯 BEGIN-2 入门训练 序列求和
- 7个JavaScript在IE和Firefox浏览器下的差异写法
- [转载] python将图片进行base64编码, 解码
- SQL server 2008 T-sql 总结
- uchar与char
- 中小型企业无线网络设计
- 车辆销售系统用例_汽车销售管理系统UML建模分析.doc
- ORACLE 提取汉字拼音首字母及汉字排序
- 您尝试安装的Adobe Flash Player版本不是最新版本解决办法
- c mysql_stmt游标移动_MySql数据库--stmt语句(续)