Akka 学习(四)Remote Actor
目录
- 一 介绍
- 1.1 Remote Actor
- 1.2 适用场景
- 1.3 踩坑点
- 二 实战
- 2.1 需求
- 2.2 Java 版本
- 2.2.1 效果图
- 2.2.2 实体类
- 2.2.3 服务端Actor 处理
- 2.2.4 服务端配置文件
- 2.2.5 客服端Actor处理
- 2.2.6 客服端配置文件
- 2.2.7 测试
- 2.3 Scala 版本
- 2.3.1 效果
- 2.2.3 服务端Actor处理
- 2.3.4 客户端Actor处理
- 2.3.5 测试
参考文章
- Gitter Chat,Akka 在线交流平台
- Akka Forums,Akka 论坛
- Akka in GitHub,Akka 开源项目仓库
- Akka Official Website,Akka 官网
- Akka Java API,Akka 应用程序编程接口
- 《Akka入门与实践》 [加]Jason Goodwin(贾森·古德温)
文章系列
- Akka 学习(一)Actor 初步认识与环境搭建 已完成
- Akka 学习(二)第一个入门程序 已完成
- Akka 学习(三)Actor的基本使用 已完成
- Akka 学习(四)Remote Actor 已完成
- Akka 学习(五)消息传递的方式 已完成
- Akka 学习(六)Actor的监督机制 已完成
- Akka 学习(七)Actor的生命周期 已完成
- Akka 学习(八)路由与Dispatcher 已完成
- Akka 学习(九)Akka Cluster 已完成
Akka 基础篇就此结束了,Akka基础篇主要介绍Akka的基本概念与一些基本术语,使用方式
代码:https://github.com/Eason-shu/Akka
一 介绍
1.1 Remote Actor
虽然Akka在单机上可以运行上百万的Actor,但出于容错、负载均衡、灰度发布、提高并行度等等原因,我们仍然需要能在多个不同的服务器上运行Actor。所以Akka提供了akka-remoting的扩展包,屏蔽底层网络传输的细节,让上层以及其简单的方式使用远程的Actor调度。
Akka Remoting 是一个以点对点方式连接 actor 系统的通信模块,它是 Akka 集群的基础。远程处理的设计由两个(相关的)设计决策驱动:
- 相关系统之间的通信是对称的:如果系统 A 可以连接到系统 B,那么系统 B 也必须能够独立连接到系统 A。
- 通信系统的角色在连接模式方面是对称的:没有只接受连接的系统,也没有只发起连接的系统。
这些决定的结果是不可能安全地创建具有预定义角色的纯客户端-服务器设置(违反假设 2)。对于客户端-服务器设置,最好使用 HTTP 或 Akka I/O。
重要提示:使用涉及网络地址转换、负载平衡器或 Docker 容器的设置违反了假设 1,除非在网络配置中采取额外步骤以允许相关系统之间的对称通信。在这种情况下,Akka 可以配置为绑定到与用于在 Akka 节点之间建立连接的地址不同的网络地址。请参阅NAT 后面或 Docker 容器中的 Akka。
1.2 适用场景
- remoting的存在其实是为akka cluster做底层支持的,通常并不会直接去使用remoting的包。但为了了解cluster的底层原理,还是有必要看下remoting。
- 同时,remoting被设计为Peer-to-Peer而非Client-Server,所以不适用于基于后者的系统开发,比如我们无法在一个provider为local的Actor里去查找一个remote actor发送消息,必须两者均为remote actor,才满足对等。
1.3 踩坑点
- Akka版本需要与Scala版本匹配
maven仓库地址:
注意版本匹配,不然会疯狂报错,运行不起来
我的版本:
scala:2.13.0
Akka版本:2.13
- 依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.hc</groupId><artifactId>ActorDemo03</artifactId><version>0.0.1-SNAPSHOT</version><name>ActorDemo03</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version><scala.version>2.11.7</scala.version></properties><dependencies><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-actor --><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-actor_2.13</artifactId><version>2.5.23</version></dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-remote_2.13</artifactId><version>2.5.23</version></dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-testkit_2.13</artifactId><version>2.5.23</version></dependency><dependency><groupId>org.scala-lang.modules</groupId><artifactId>scala-java8-compat_2.11</artifactId><version>1.0.2</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version><scope>test</scope></dependency><dependency><groupId>org.testng</groupId><artifactId>testng</artifactId><version>RELEASE</version><scope>compile</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build></project>
二 实战
2.1 需求
- 我们还将创建一个数据库客户端,用于展示如何请求服务器,以及如何从远程Actor中获取Future。
- 服务器端的服务接收到客户端的请求后将返回Future。
- 这样我们就已经编写了一个可以使用的键值存储数据库(和redis很类似)以及一个可以使用该数据库的远程客户端。
2.2 Java 版本
2.2.1 效果图
- 服务端启动前
- 客服端启动
- 服务端收到请求后
2.2.2 实体类
- SetRequest
package pojo;import java.io.Serializable;/*** @description: 设置消息* @author: shu* @createDate: 2022/11/28 11:51* @version: 1.0*/
public class SetRequest implements Serializable {public final String key;public final Object value;public SetRequest(String key, Object value) {this.key = key;this.value = value;}}
- GetRequest
package pojo;import java.io.Serializable;/*** @description: 获取消息* @author: shu* @createDate: 2022/11/28 11:52* @version: 1.0*/
public class GetRequest implements Serializable {public final String key;public GetRequest(String key) {this.key = key;}
}
- 错误消息
package pojo;import java.io.Serializable;/*** @description:* @author: shu* @createDate: 2022/11/28 11:52* @version: 1.0*/
public class KeyNotFoundException extends Exception implementsSerializable {public final String key;public KeyNotFoundException(String key) {this.key = key;}
}
2.2.3 服务端Actor 处理
import akka.actor.*;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import pojo.GetRequest;
import pojo.KeyNotFoundException;
import pojo.SetRequest;import java.util.HashMap;
import java.util.Map;/*** @description:* @author: shu* @createDate: 2022/11/28 12:31* @version: 1.0*/
public class RequestActor extends AbstractActor {LoggingAdapter log = Logging.getLogger(getContext().system(), this);protected final Map<String, Object> map = new HashMap<>();@Overridepublic void preStart() throws Exception {log.info("ToFindRemoteActor is starting");}@Overridepublic Receive createReceive() {return ReceiveBuilder.create()// 设置消息.match(SetRequest.class, message -> {// 打印消息log.info("Received Set request: {}", message.key);// 缓存消息map.put(message.key, message.value);// 回应消息sender().tell(new Status.Success(message.key), self());})// 得到消息.match(GetRequest.class, message -> {// 打印日志log.info("Received Get request: {}", message.key);// 获取消息Object value = (Object) map.get(message.key);Object response = (value!= null)? value: new Status.Failure(new KeyNotFoundException(message.key));// 响应消息sender().tell(response, self());})// 未找到消息.matchAny(o ->sender().tell(new Status.Failure(new ClassNotFoundException()), self())).build();}/*** 测试* @param args*/public static void main(String[] args) {// Config config = ConfigFactory.parseString(
// "akka.remote.netty.tcp.port=" + 2551)
// .withFallback(ConfigFactory.load("application.conf"));// Create an Akka systemActorSystem system = ActorSystem.create("akkademy");// Create an actorActorRef ref = system.actorOf(Props.create(RequestActor.class), "akkademy-db");System.out.println(ref);}}
2.2.4 服务端配置文件
akka {stdout-loglevel = "DEBUG"loglevel = "DEBUG"actor {provider = "akka.remote.RemoteActorRefProvider"
}
remote {enabled-transports = ["akka.remote.netty.tcp"]netty.tcp {hostname = "127.0.0.1"
}
}
log-sent-messages = on
log-received-messages = on
}
2.2.5 客服端Actor处理
package client;import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.pattern.AskableActorSelection;
import akka.util.Timeout;
import pojo.GetRequest;
import pojo.SetRequest;import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;import static scala.compat.java8.FutureConverters.toJava;/**
* @description:
* @author: shu
* @createDate: 2022/11/28 16:10
* @version: 1.0
*/
public class JClient {private final ActorSystem system = ActorSystem.create("LocalSystem");private final ActorSelection remoteDb;public JClient(String remoteAddress) {remoteDb = system.actorSelection("akka.tcp://akkademy@" +remoteAddress + "/user/akkademy-db");}/**
* 缓存消息
* @param key
* @param value
* @return
*/public CompletionStage set(String key, Object value) {return toJava(new AskableActorSelection(remoteDb).ask(new SetRequest(key, value), Timeout.apply(5000, TimeUnit.SECONDS)));}/**
* 获取缓存消息
* @param key
* @return
*/public CompletionStage get(String key){return toJava(new AskableActorSelection(remoteDb).ask(new GetRequest(key), Timeout.apply(5000, TimeUnit.SECONDS)));}}
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import client.JClient;
import org.junit.Test;
import pojo.SetRequest;import java.util.concurrent.CompletableFuture;
public class AkkademyDbTest {/*** 测试注意需要放在不同的两个项目进行测试,不然会Caused by: java.net.BindException: Address already in use: bind* @throws Exception*/@Testpublic void itShouldSetRecord() throws Exception {JClient client = new JClient("127.0.0.1:2552");client.set("123", 123);Integer result = (Integer) ((CompletableFuture) client.get("123")).get();System.out.println("获取的结果:"+result);assert(result == 123);}}
2.2.6 客服端配置文件
akka {stdout-loglevel = "DEBUG"loglevel = "DEBUG"actor {provider = "akka.remote.RemoteActorRefProvider"
}
remote {enabled-transports = ["akka.remote.netty.tcp"]netty.tcp {hostname = "127.0.0.1"port = 0
}
log-sent-messages = on
log-received-messages = on
}
}
2.2.7 测试
注意:需要放在两个不一样的配置项目中
- 服务端项目启动,等待请求的到来,注意配置文件
- 客服端项目启动,发送AKKA请求给服务端
- 服务端收到客服端的请求,缓存请求数据,把缓存结果返回给客服端
2.3 Scala 版本
2.3.1 效果
- 启动服务前
- 客户端启动
- 服务端收到请求
2.2.3 服务端Actor处理
import akka.actor.{Actor, Status}
import akka.event.{Logging, LoggingAdapter}import scala.collection.convert.ImplicitConversions.`map AsJavaMap`
import scala.collection.mutable/*** @description:* @author: shu* @createDate: 2022/11/28 12:41* @version: 1.0*/
class ScalaRequest extends Actor {protected val log: LoggingAdapter = Logging.getLogger(context.system, this)val map: mutable.Map[String, Object] = new mutable.HashMap[String, Object]override def receive = {case SetRequest(key, value) =>log.info("received SetRequest - key: {} value: {}", key, value)map.put(key, value)sender() ! Status.Successcase GetRequest(key) =>log.info("received GetRequest - key: {}", key)val response: Option[Object] = map.get(key)response match{case Some(x) => sender() ! xcase None => sender() ! Status.Failure(new KeyNotFoundException(key))}case o => Status.Failure(new ClassNotFoundException)}}case class SetRequest(key: String, value: Object)
case class GetRequest(key: String)
case class KeyNotFoundException(key: String) extends Exception
配置文件跟Java一样
2.3.4 客户端Actor处理
import akka.actor.ActorSystem
import akka.pattern.ask
import akka.util.Timeoutimport scala.concurrent.duration.DurationInt
import scala.language.postfixOps/*** @description:* @author: shu* @createDate: 2022/12/1 11:43* @version: 1.0*/
class SClient(remoteAddress: String) {private implicit val timeout = Timeout(2 seconds)private implicit val system = ActorSystem("LocalSystem")private val remoteDb = system.actorSelection(s"akka.tcp://akkademy@$remoteAddress/user/akkademy-db")def set(key: String, value: Object) = {remoteDb ? SetRequest(key, value)}def get(key: String) = {remoteDb ? GetRequest(key)}
}case class SetRequest(key: String, value: Object)
case class GetRequest(key: String)
case class KeyNotFoundException(key: String) extends Exception
import scala.concurrent.duration.DurationInt
import scala.concurrent.Await
import scala.language.postfixOps/*** @description:* @author: shu* @createDate: 2022/12/1 11:44* @version: 1.0*/object Main extends App {val client = new SClient("127.0.0.1:2552")client.set("123", new Integer(123))val futureResult = client.get("123")val result = Await.result(futureResult, 10 seconds)}
2.3.5 测试
注意:需要放在两个不一样的配置项目中
- 服务端项目启动,等待请求的到来,注意配置文件
- 客服端项目启动,发送AKKA请求给服务端
- 服务端收到客服端的请求,缓存请求数据,把缓存结果返回给客服端
Akka 学习(四)Remote Actor相关推荐
- akka学习教程(十四) akka分布式实战
akka系列文章目录 akka学习教程(十四) akka分布式实战 akka学习教程(十三) akka分布式 akka学习教程(十二) Spring与Akka的集成 akka学习教程(十一) akka ...
- akka学习教程(十三) akka分布式
akka系列文章目录 akka学习教程(十四) akka分布式实战 akka学习教程(十三) akka分布式 akka学习教程(十二) Spring与Akka的集成 akka学习教程(十一) akka ...
- (转)Akka学习笔记
Akka学习笔记系列文章: <Akka学习笔记:ACTORS介绍> <Akka学习笔记:Actor消息传递(1)> <Akka学习笔记:Actor消息传递(2)> ...
- Akka 学习(九)Akka Cluster
参考文章 Gitter Chat,Akka 在线交流平台 Akka Forums,Akka 论坛 Akka in GitHub,Akka 开源项目仓库 Akka Official Website,Ak ...
- Akka系列---什么是Actor
本文已.Net语法为主,同时写有Scala及Java实现代码 严肃的说,演员是一个广泛的概念,作为外行人我对Actor 模型的定义: Actor是一个系统中参与者的虚拟人物,Actor与Actor之间 ...
- C#多线程学习(四) 多线程的自动管理(线程池) (转载系列)——继续搜索引擎研究...
在多线程的程序中,经常会出现两种情况: 一种情况: 应用程序中,线程把大部分的时间花费在等待状态,等待某个事件发生,然后才能给予响应 这一般使用ThreadPo ...
- python学习四(处理数据)
python学习四(处理数据) head first python中的一个数据处理的例子 有四个U10选手的600米成绩,请取出每个选手跑的最快的3个时间.以下是四位选手的9次成绩 James 2-3 ...
- PyTorch框架学习四——计算图与动态图机制
PyTorch框架学习四--计算图与动态图机制 一.计算图 二.动态图与静态图 三.torch.autograd 1.torch.autograd.backward() 2.torch.autogra ...
- Docker学习四:Docker 网络
前言 本次学习来自于datawhale组队学习: 教程地址为: https://github.com/datawhalechina/team-learning-program/tree/master/ ...
最新文章
- 汇总 Linux下获取详细硬件信息的工具:Dmidecode命令详解
- iovec结构体定义及使用
- 嵌入式linux开发环境搭建——VirtualBox虚拟机网络环境解析
- Ajax的异步,是鸡肋还是鸡排?
- 序列代码UVa 111 History Grading (最长公共子序列)
- oracle pls 00905,Oracle数据库存储过程出错了!大神帮忙看下什么问题!急急急!...
- 指数函数图像怎么画?
- 结构梁配筋最牛插件_结构分析|结构抗震概念——强柱弱梁
- JavaFX屏幕截图工具ScreenCapture
- PDA用ActiveSync同步上网 方法
- 跳转到高德地图或百度地图或高德网页导航
- “阻塞(pend)”与“挂起(suspend)”的区别?
- HDU-1814 Peaceful Commission (2-SAT暴力模板 暴力染色+字典序最小)
- 等了3个月终于来啦!传智播客C/C++视频教程开始更新喽~
- C语言 输入一个5行5列的数组。1.求数组主对角线上元素的和。2.求出辅对角线上元素的积。3.找出主对角线上最大的值及其位置
- 冰刃·笔记 | 勒索病毒的10%和90%等式
- 啊哈添柴挑战Java1826. 顺序输出(简单)
- c语言有趣代码,分享一段有趣的小代码
- java判断是平年还是闰年
- iis/apache + php5 + mysql5_Windows下IIS6/Apache2.2.4+MySQL5.2+PHP5.2.6安装配置方法
热门文章
- 怎样在表格中选出同一类_怎样将一个excel表格里头的多个同一个名称下面的多个数据筛选出来...
- html5 微信墙,HTML5服务器端推送事件 解决PHP微信墙推送问题
- 二维码生成器和二维码分享
- 华为鸿蒙系统使用技巧,【图片】华为鸿蒙系统的厉害之处在于 你可能非用不可
!【手机吧】_百度贴吧...
- steam俄罗斯钓鱼4游戏服务器未响应,steam游戏推荐:《俄罗斯钓鱼4》一款真实的钓鱼游戏...
- 关于java 计算器设计,Java课程设计报告---设计一个多功能计算器
- 微信「订阅号助手」 App 正式上线,请尽情吐槽!
- 使用计算机计算教学设计,《用计算器计算》教学设计
- 接收微信公众号数据解析以及逻辑实现代码
- 径向基函数神经网络RBFNN