目录

  • 一 介绍
    • 1.1 Remote Actor
    • 1.2 适用场景
    • 1.3 踩坑点
  • 二 实战
    • 2.1 需求
    • 2.2 Java 版本
      • 2.2.1 效果图
      • 2.2.2 实体类
      • 2.2.3 服务端Actor 处理
      • 2.2.4 服务端配置文件
      • 2.2.5 客服端Actor处理
      • 2.2.6 客服端配置文件
      • 2.2.7 测试
    • 2.3 Scala 版本
      • 2.3.1 效果
      • 2.2.3 服务端Actor处理
      • 2.3.4 客户端Actor处理
      • 2.3.5 测试

参考文章

  • Gitter Chat,Akka 在线交流平台
  • Akka Forums,Akka 论坛
  • Akka in GitHub,Akka 开源项目仓库
  • Akka Official Website,Akka 官网
  • Akka Java API,Akka 应用程序编程接口
  • 《Akka入门与实践》 [加]Jason Goodwin(贾森·古德温)

文章系列

  • Akka 学习(一)Actor 初步认识与环境搭建 已完成
  • Akka 学习(二)第一个入门程序 已完成
  • Akka 学习(三)Actor的基本使用 已完成
  • Akka 学习(四)Remote Actor 已完成
  • Akka 学习(五)消息传递的方式 已完成
  • Akka 学习(六)Actor的监督机制 已完成
  • Akka 学习(七)Actor的生命周期 已完成
  • Akka 学习(八)路由与Dispatcher 已完成
  • Akka 学习(九)Akka Cluster 已完成

Akka 基础篇就此结束了,Akka基础篇主要介绍Akka的基本概念与一些基本术语,使用方式
代码:https://github.com/Eason-shu/Akka

一 介绍

1.1 Remote Actor

虽然Akka在单机上可以运行上百万的Actor,但出于容错、负载均衡、灰度发布、提高并行度等等原因,我们仍然需要能在多个不同的服务器上运行Actor。所以Akka提供了akka-remoting的扩展包,屏蔽底层网络传输的细节,让上层以及其简单的方式使用远程的Actor调度。
Akka Remoting 是一个以点对点方式连接 actor 系统的通信模块,它是 Akka 集群的基础。远程处理的设计由两个(相关的)设计决策驱动:

  1. 相关系统之间的通信是对称的:如果系统 A 可以连接到系统 B,那么系统 B 也必须能够独立连接到系统 A。
  2. 通信系统的角色在连接模式方面是对称的:没有只接受连接的系统,也没有只发起连接的系统。

这些决定的结果是不可能安全地创建具有预定义角色的纯客户端-服务器设置(违反假设 2)。对于客户端-服务器设置,最好使用 HTTP 或 Akka I/O。
重要提示:使用涉及网络地址转换、负载平衡器或 Docker 容器的设置违反了假设 1,除非在网络配置中采取额外步骤以允许相关系统之间的对称通信。在这种情况下,Akka 可以配置为绑定到与用于在 Akka 节点之间建立连接的地址不同的网络地址。请参阅NAT 后面或 Docker 容器中的 Akka。

1.2 适用场景

  • remoting的存在其实是为akka cluster做底层支持的,通常并不会直接去使用remoting的包。但为了了解cluster的底层原理,还是有必要看下remoting。
  • 同时,remoting被设计为Peer-to-Peer而非Client-Server,所以不适用于基于后者的系统开发,比如我们无法在一个provider为local的Actor里去查找一个remote actor发送消息,必须两者均为remote actor,才满足对等。

1.3 踩坑点

  • Akka版本需要与Scala版本匹配

maven仓库地址:

注意版本匹配,不然会疯狂报错,运行不起来
我的版本:
scala:2.13.0
Akka版本:2.13

  • 依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.hc</groupId><artifactId>ActorDemo03</artifactId><version>0.0.1-SNAPSHOT</version><name>ActorDemo03</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version><scala.version>2.11.7</scala.version></properties><dependencies><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-actor --><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-actor_2.13</artifactId><version>2.5.23</version></dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-remote_2.13</artifactId><version>2.5.23</version></dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-testkit_2.13</artifactId><version>2.5.23</version></dependency><dependency><groupId>org.scala-lang.modules</groupId><artifactId>scala-java8-compat_2.11</artifactId><version>1.0.2</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version><scope>test</scope></dependency><dependency><groupId>org.testng</groupId><artifactId>testng</artifactId><version>RELEASE</version><scope>compile</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build></project>

二 实战

2.1 需求

  • 我们还将创建一个数据库客户端,用于展示如何请求服务器,以及如何从远程Actor中获取Future。
  • 服务器端的服务接收到客户端的请求后将返回Future。
  • 这样我们就已经编写了一个可以使用的键值存储数据库(和redis很类似)以及一个可以使用该数据库的远程客户端。

2.2 Java 版本

2.2.1 效果图

  • 服务端启动前

  • 客服端启动

  • 服务端收到请求后

2.2.2 实体类

  • SetRequest
package pojo;import java.io.Serializable;/*** @description: 设置消息* @author: shu* @createDate: 2022/11/28 11:51* @version: 1.0*/
public class SetRequest implements Serializable {public final String key;public final Object value;public SetRequest(String key, Object value) {this.key = key;this.value = value;}}
  • GetRequest
package pojo;import java.io.Serializable;/*** @description: 获取消息* @author: shu* @createDate: 2022/11/28 11:52* @version: 1.0*/
public class GetRequest implements Serializable {public final String key;public GetRequest(String key) {this.key = key;}
}
  • 错误消息
package pojo;import java.io.Serializable;/*** @description:* @author: shu* @createDate: 2022/11/28 11:52* @version: 1.0*/
public class KeyNotFoundException extends Exception implementsSerializable {public final String key;public KeyNotFoundException(String key) {this.key = key;}
}

2.2.3 服务端Actor 处理


import akka.actor.*;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import pojo.GetRequest;
import pojo.KeyNotFoundException;
import pojo.SetRequest;import java.util.HashMap;
import java.util.Map;/*** @description:* @author: shu* @createDate: 2022/11/28 12:31* @version: 1.0*/
public class RequestActor extends AbstractActor {LoggingAdapter log = Logging.getLogger(getContext().system(), this);protected final Map<String, Object> map = new HashMap<>();@Overridepublic void preStart() throws Exception {log.info("ToFindRemoteActor is starting");}@Overridepublic Receive createReceive() {return  ReceiveBuilder.create()// 设置消息.match(SetRequest.class, message -> {// 打印消息log.info("Received Set request: {}", message.key);// 缓存消息map.put(message.key, message.value);// 回应消息sender().tell(new Status.Success(message.key), self());})// 得到消息.match(GetRequest.class, message -> {// 打印日志log.info("Received Get request: {}", message.key);// 获取消息Object value = (Object) map.get(message.key);Object response = (value!= null)? value: new Status.Failure(new KeyNotFoundException(message.key));// 响应消息sender().tell(response, self());})// 未找到消息.matchAny(o ->sender().tell(new Status.Failure(new ClassNotFoundException()), self())).build();}/*** 测试* @param args*/public static void main(String[] args) {//        Config config = ConfigFactory.parseString(
//                        "akka.remote.netty.tcp.port=" + 2551)
//                .withFallback(ConfigFactory.load("application.conf"));// Create an Akka systemActorSystem system = ActorSystem.create("akkademy");// Create an actorActorRef ref = system.actorOf(Props.create(RequestActor.class), "akkademy-db");System.out.println(ref);}}

2.2.4 服务端配置文件

akka {stdout-loglevel = "DEBUG"loglevel = "DEBUG"actor {provider = "akka.remote.RemoteActorRefProvider"
}
remote {enabled-transports = ["akka.remote.netty.tcp"]netty.tcp {hostname = "127.0.0.1"
}
}
log-sent-messages = on
log-received-messages = on
}

2.2.5 客服端Actor处理

package client;import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.pattern.AskableActorSelection;
import akka.util.Timeout;
import pojo.GetRequest;
import pojo.SetRequest;import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;import static scala.compat.java8.FutureConverters.toJava;/**
* @description:
* @author: shu
* @createDate: 2022/11/28 16:10
* @version: 1.0
*/
public class JClient {private final ActorSystem system = ActorSystem.create("LocalSystem");private final ActorSelection remoteDb;public JClient(String remoteAddress) {remoteDb = system.actorSelection("akka.tcp://akkademy@" +remoteAddress + "/user/akkademy-db");}/**
* 缓存消息
* @param key
* @param value
* @return
*/public CompletionStage set(String key, Object value) {return toJava(new AskableActorSelection(remoteDb).ask(new SetRequest(key, value), Timeout.apply(5000, TimeUnit.SECONDS)));}/**
* 获取缓存消息
* @param key
* @return
*/public CompletionStage get(String key){return   toJava(new AskableActorSelection(remoteDb).ask(new GetRequest(key), Timeout.apply(5000, TimeUnit.SECONDS)));}}
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import client.JClient;
import org.junit.Test;
import pojo.SetRequest;import java.util.concurrent.CompletableFuture;
public class AkkademyDbTest {/*** 测试注意需要放在不同的两个项目进行测试,不然会Caused by: java.net.BindException: Address already in use: bind* @throws Exception*/@Testpublic void itShouldSetRecord() throws Exception {JClient client = new JClient("127.0.0.1:2552");client.set("123", 123);Integer result = (Integer) ((CompletableFuture) client.get("123")).get();System.out.println("获取的结果:"+result);assert(result == 123);}}

2.2.6 客服端配置文件

akka {stdout-loglevel = "DEBUG"loglevel = "DEBUG"actor {provider = "akka.remote.RemoteActorRefProvider"
}
remote {enabled-transports = ["akka.remote.netty.tcp"]netty.tcp {hostname = "127.0.0.1"port = 0
}
log-sent-messages = on
log-received-messages = on
}
}

2.2.7 测试

注意:需要放在两个不一样的配置项目中

  1. 服务端项目启动,等待请求的到来,注意配置文件
  2. 客服端项目启动,发送AKKA请求给服务端
  3. 服务端收到客服端的请求,缓存请求数据,把缓存结果返回给客服端

2.3 Scala 版本

2.3.1 效果

  • 启动服务前

  • 客户端启动

  • 服务端收到请求

2.2.3 服务端Actor处理

import akka.actor.{Actor, Status}
import akka.event.{Logging, LoggingAdapter}import scala.collection.convert.ImplicitConversions.`map AsJavaMap`
import scala.collection.mutable/*** @description:* @author: shu* @createDate: 2022/11/28 12:41* @version: 1.0*/
class ScalaRequest extends Actor {protected val log: LoggingAdapter = Logging.getLogger(context.system, this)val map: mutable.Map[String, Object] = new mutable.HashMap[String, Object]override def receive = {case SetRequest(key, value) =>log.info("received SetRequest - key: {} value: {}", key, value)map.put(key, value)sender() ! Status.Successcase GetRequest(key) =>log.info("received GetRequest - key: {}", key)val response: Option[Object] = map.get(key)response match{case Some(x) => sender() ! xcase None => sender() ! Status.Failure(new KeyNotFoundException(key))}case o => Status.Failure(new ClassNotFoundException)}}case class SetRequest(key: String, value: Object)
case class GetRequest(key: String)
case class KeyNotFoundException(key: String) extends Exception

配置文件跟Java一样

2.3.4 客户端Actor处理

import akka.actor.ActorSystem
import akka.pattern.ask
import akka.util.Timeoutimport scala.concurrent.duration.DurationInt
import scala.language.postfixOps/*** @description:* @author: shu* @createDate: 2022/12/1 11:43* @version: 1.0*/
class SClient(remoteAddress: String) {private implicit val timeout = Timeout(2 seconds)private implicit val system = ActorSystem("LocalSystem")private val remoteDb = system.actorSelection(s"akka.tcp://akkademy@$remoteAddress/user/akkademy-db")def set(key: String, value: Object) = {remoteDb ? SetRequest(key, value)}def get(key: String) = {remoteDb ? GetRequest(key)}
}case class SetRequest(key: String, value: Object)
case class GetRequest(key: String)
case class KeyNotFoundException(key: String) extends Exception
import scala.concurrent.duration.DurationInt
import scala.concurrent.Await
import scala.language.postfixOps/*** @description:* @author: shu* @createDate: 2022/12/1 11:44* @version: 1.0*/object Main extends App {val client = new SClient("127.0.0.1:2552")client.set("123", new Integer(123))val futureResult = client.get("123")val result = Await.result(futureResult, 10  seconds)}

2.3.5 测试

注意:需要放在两个不一样的配置项目中

  1. 服务端项目启动,等待请求的到来,注意配置文件
  2. 客服端项目启动,发送AKKA请求给服务端
  3. 服务端收到客服端的请求,缓存请求数据,把缓存结果返回给客服端

Akka 学习(四)Remote Actor相关推荐

  1. akka学习教程(十四) akka分布式实战

    akka系列文章目录 akka学习教程(十四) akka分布式实战 akka学习教程(十三) akka分布式 akka学习教程(十二) Spring与Akka的集成 akka学习教程(十一) akka ...

  2. akka学习教程(十三) akka分布式

    akka系列文章目录 akka学习教程(十四) akka分布式实战 akka学习教程(十三) akka分布式 akka学习教程(十二) Spring与Akka的集成 akka学习教程(十一) akka ...

  3. (转)Akka学习笔记

    Akka学习笔记系列文章: <Akka学习笔记:ACTORS介绍> <Akka学习笔记:Actor消息传递(1)> <Akka学习笔记:Actor消息传递(2)> ...

  4. Akka 学习(九)Akka Cluster

    参考文章 Gitter Chat,Akka 在线交流平台 Akka Forums,Akka 论坛 Akka in GitHub,Akka 开源项目仓库 Akka Official Website,Ak ...

  5. Akka系列---什么是Actor

    本文已.Net语法为主,同时写有Scala及Java实现代码 严肃的说,演员是一个广泛的概念,作为外行人我对Actor 模型的定义: Actor是一个系统中参与者的虚拟人物,Actor与Actor之间 ...

  6. C#多线程学习(四) 多线程的自动管理(线程池) (转载系列)——继续搜索引擎研究...

    在多线程的程序中,经常会出现两种情况: 一种情况:   应用程序中,线程把大部分的时间花费在等待状态,等待某个事件发生,然后才能给予响应                   这一般使用ThreadPo ...

  7. python学习四(处理数据)

    python学习四(处理数据) head first python中的一个数据处理的例子 有四个U10选手的600米成绩,请取出每个选手跑的最快的3个时间.以下是四位选手的9次成绩 James 2-3 ...

  8. PyTorch框架学习四——计算图与动态图机制

    PyTorch框架学习四--计算图与动态图机制 一.计算图 二.动态图与静态图 三.torch.autograd 1.torch.autograd.backward() 2.torch.autogra ...

  9. Docker学习四:Docker 网络

    前言 本次学习来自于datawhale组队学习: 教程地址为: https://github.com/datawhalechina/team-learning-program/tree/master/ ...

最新文章

  1. 汇总 Linux下获取详细硬件信息的工具:Dmidecode命令详解
  2. iovec结构体定义及使用
  3. 嵌入式linux开发环境搭建——VirtualBox虚拟机网络环境解析
  4. Ajax的异步,是鸡肋还是鸡排?
  5. 序列代码UVa 111 History Grading (最长公共子序列)
  6. oracle pls 00905,Oracle数据库存储过程出错了!大神帮忙看下什么问题!急急急!...
  7. 指数函数图像怎么画?
  8. 结构梁配筋最牛插件_结构分析|结构抗震概念——强柱弱梁
  9. JavaFX屏幕截图工具ScreenCapture
  10. PDA用ActiveSync同步上网 方法
  11. 跳转到高德地图或百度地图或高德网页导航
  12. “阻塞(pend)”与“挂起(suspend)”的区别?
  13. HDU-1814 Peaceful Commission (2-SAT暴力模板 暴力染色+字典序最小)
  14. 等了3个月终于来啦!传智播客C/C++视频教程开始更新喽~
  15. C语言 输入一个5行5列的数组。1.求数组主对角线上元素的和。2.求出辅对角线上元素的积。3.找出主对角线上最大的值及其位置
  16. 冰刃·笔记 | 勒索病毒的10%和90%等式
  17. 啊哈添柴挑战Java1826. 顺序输出(简单)
  18. c语言有趣代码,分享一段有趣的小代码
  19. java判断是平年还是闰年
  20. iis/apache + php5 + mysql5_Windows下IIS6/Apache2.2.4+MySQL5.2+PHP5.2.6安装配置方法

热门文章

  1. 怎样在表格中选出同一类_怎样将一个excel表格里头的多个同一个名称下面的多个数据筛选出来...
  2. html5 微信墙,HTML5服务器端推送事件 解决PHP微信墙推送问题
  3. 二维码生成器和二维码分享
  4. 华为鸿蒙系统使用技巧,【图片】华为鸿蒙系统的厉害之处在于 你可能非用不可 !【手机吧】_百度贴吧...
  5. steam俄罗斯钓鱼4游戏服务器未响应,steam游戏推荐:《俄罗斯钓鱼4》一款真实的钓鱼游戏...
  6. 关于java 计算器设计,Java课程设计报告---设计一个多功能计算器
  7. 微信「订阅号助手」 App 正式上线,请尽情吐槽!
  8. 使用计算机计算教学设计,《用计算器计算》教学设计
  9. 接收微信公众号数据解析以及逻辑实现代码
  10. 径向基函数神经网络RBFNN