1.概述

转载:Akka并发编程设计

2. 响应式系统设计

Akka 被称为是一个响应式平台,更具体的说,它是 Typesafe 响应式平台的一部分。响应式宣言中包含了 4 个准则,也可以说是设计目标:灵敏性、伸缩性、容错性以及事件驱动设计。

灵敏性

应用程序应该尽可能快的对请求做出响应。为了尽快向用户返回响应,在顺序获取数据和并行获取数据之间选择的话,始终应该优先选择并行获取数据。如果可能出现错误,应该立即返回,将问题通知用户,不要让用户等待直到超时。

伸缩性

应用程序应该能够根据不同的工作负载进行伸缩扩展(尤其是通过增加计算资源来进行扩展)。如果在虚拟机上运行内存数据库,那么添加另一个虚拟节点就可以将所有的查询请求分布到两台虚拟服务器上,将可能的吞吐量增加至原来的两倍。添加额外的节点应该能够几乎线性地提高系统的性能。
 
 增加一个内存数据库的节点后,还可以将数据分为两半,并将其中的一半移至新的节点,这样就能够将内存容量提高至原来的两倍。添加节点应该能够几乎线性地提高内存容量。

容错性

如果系统的某个组件发生错误,对与该组件无关的请求不应该产生任何影响。错误是难以避免的,因此应该将错误造成的影响限制在发生错误的组件内。

事件驱动/消息驱动

使用消息而不直接进行方法调用提供了一种帮助我们满足另外 3 个响应式准则的方法。消息驱动的系统着重于控制何时、何地以及如何对请求做出响应,允许做出响应的组件进行路由以及负载均衡。
 
 由于异步的消息驱动系统只有在真正需要时才会消耗资源(比如线程),因此它对系统资源的利用更高效。消息也可以被发送到远程机器(位置透明)。因为要发送的消息暂存在 Actor 外的消息队列中,并从该队列中被发出,所以就能够通过监督机制使得发生错误的系统进行自我恢复。

4 个响应式准则之间并不是完全独立的。为了满足某个准则而采取的方法通常也对满足其他准则有所帮助。例如,如果发现某个服务响应速度较慢,我们可能会在短时间内停止再向该服务发送请求,等待其恢复正常,并立即向用户返回错误信息。这样做降低了响应慢的服务不堪重负直接崩溃的风险,因此也提高了系统的容错性。除此之外,我们立即告知了用户系统发生的问题,也就改善了系统的响应速度,如图所示:

3.剖析 Actor

一个简单的例子,简单的构建一个 Actor ,这个 Actor 接收 “Ping” ,返回字符串 “Pong”作为响应。

package com.akka.study.blog.lllpan;import akka.actor.AbstractActor;
import akka.actor.Status;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import scala.PartialFunction;public class JavaPongActor extends AbstractActor {protected final LoggingAdapter log = Logging.getLogger(context().system(), this);@Overridepublic Receive createReceive() {return ReceiveBuilder.create().matchEquals("Ping", message -> {sender().tell("Pong", self());log.info("message:" + message);log.info("sender:" + sender().path());log.info("self:" + self());}).matchAny(other -> {sender().tell(new Status.Failure(new Exception("unknown message")), self());log.info("other:" + other);}).build();}}

AbstractActor : 这个 Java8 特有的 API,利用了 Lamdba 特性。UntypedActor 也可以作为基类来继承,但是这个类比较旧。在 UntypeActor 的 API 中,会得到一个对象,然后必须用 if 语句对其进行条件判断;但是在 Java8 的 API 可以通过模式匹配,表达力更强。

Receive : AbstractActor 类有一个 receive 方法,其子类必须实现这个方法或者在构造方法中调用这个方法。receive 方法返回的类型是 PartialFuncation ,这个类型来自于 Scala 的 API 。在 Java 中并没有提供任何原生方法来构造 Scala 的 PartialFunction,因此 Akka 为我们提供了一个抽象的构造方法类 ReceiveBuilder ,用于生产 PartialFunction 作为返回值。

ReceiveBuilder:连续调用 ReceiveBuilder 的方法,为所有需要匹配处理的消息输入消息类型提供响应方法的描述。然后调用 build() 方法生成所需要的返回值 PartialFunction。

Match : 用于匹配消息类型。match 函数从上至下匹配,所以可以先定义特殊情况,最后定义一般情况。

match(final Class<? extends P> type, FI.UnitApply<? extends P> apply)

描述了对于任何尚未匹配的该类型的实例,以及响应行为。

match(final Class<P> type, final FI.TypedPredicate<P> predicate,final FI.UnitApply<P> apply)

描述了对于 predicate 条件函数为真的某特定类型的消息,应该如何响应。

matchAny(final FI.UnitApply<Object> apply)

该函数匹配所有尚未匹配的消息,通常来说,最佳的事件是返回错误消息,或者记录错误信息到日志。

向 sender() 返回消息:调用了 sender() 方法后,就可以返回所收到消息的响应了。响应的对象既可以是 Actor,也可以是来源于 Actor 系统外部的请求。第一种情况相当直接:上面的代码所示,返回的消息会直接发送到该 Actor 的收件信箱中。

tell():sender()函数会返回一个 ActorRef。在 sender().tell() 中,tell() 是最基本的单项消息传输模式。第一个参数是要发送至对方邮箱的消息,第二个参数是希望对方 Actor 看到的发送者。 ActorRef.noSender()则表示没有发送者,也就没有返回地址。

在当前最新版本中,AbstractActor 中的一些方法被调整。比如上面书中所说要重写 receive 方法,但是新版中改为必须要重写 createReceive 方法,且返回值由之前的 PartialFunction 改为 Receive 。与之对应的 生产 PartialFunction 的 ReceiveBuilder 也做了调整,之前 ReceiveBuilder 中的 match 之类的方法由 static 全部变为非 static。build 方法也被重写。

4. 创建 Actor

访问 Actor 的方式和访问普通对象的方式有所不同,我们从来不会得到 Actor 的实例,也不调用 Actor 的方法,也不直接改变 Actor 的状态,反之,只会向 Actor 发送消息。通过使用基于消息的机制,可以相当完整的将 Actor 给封装起来,如果只通过消息通讯,那就永远不会需要获取 Actor 的实例,只需要一种机制来支持向 Actor 发送消息并接受响应。—— ActorRef

在 Akka 中,这个指向 Actor 实例的引用叫做 ActorRef 。ActorRef 是一个无类型的引用,将其指向的 Actor 封装起来,提供了更高层的抽象,并且给用户提供了一种与 Actor 进行通信的机制。

ActorRef pingref = system.actorOf(Props.create(JavaPongActor.class), "pingActor");

actorOf 方法会生成一个新的 Actor 并返回指向这个 Actor 的引用。

4.1 Props

为了能够将 Actor 的实例封装起来,不让其被外部直接访问。我们将所有构造函数的参数传给一个 Props 实例,Props 允许我们传入 Actor 类型以及一个可变参数列表。

def create(clazz: Class[_], args: AnyRef*): Props = new Props(deploy = Props.defaultDeploy, clazz = clazz, args = args.toList)

actorOf 创建一个 Actor,并返回该 Actor 的引用 ActorRef,除此之外,还可以使用 actorSelection 来获取 Actor 的 ActorRef。每个 Actor 在创建的时候都会有一个路径。可以通过 ActorRef.path 查看路径,如:

ActorRef pingref = system.actorOf(Props.create(JavaPongActor.class), "pingActor");
System.out.println(pingref.path());

输出:akka://PongPing/user/pingActor。该路径是一个 URL,它甚至可以指向使用 akka.tcp 协议的远程 Actor。如果知道了 Actor 的路径,就可以使用 actorSelection 来获取指向该 Actor 引用的 ActorSelection ,无论该 Actor 在本地还是在远程。

4.2 ActorSelection

ActorSelection 也是一个指向 Actor 的引用。作用和 ActorRef 一样,同样可以使用 ActorSeletion 让 Actor 之间互相通信。这也是对 Akka 位置透明性的最好诠释。

ActorRef pingref = system.actorOf(Props.create(JavaPongActor.class), "pingActor");
ActorSelection selection = system.actorSelection(pingref.path());

5.Promise、Future 和事件驱动的编程模型

5.1 阻塞IO

几乎每个开发者都很熟悉阻塞式的代码。进行 IO 操作时,编写的都是阻塞式的代码。当我们调用一个同步的 API 时,调用的方法不会立即返回:应用程序会等待该调用执行完成。

例如,如果发起一个 HTTP 请求的话,只有在请求完成后,才会收到返回的响应对象。由于发起调用的线程会暂停执行并等待,因此等待 IO 操作完成的代码都是阻塞的,在 IO 操作完成之前,发起调用的线程无法进行任何其他操作。


使用多线程或线程池来处理阻塞IO时,需要考虑到在同一个CPU核心中运行多个线程时, 操作系统需要不断切换线程上下文, 保证所有的线程都能分配到CPU时间片。可能会遇到如下问题:

代码没有在返回类型中明确表示错误;
代码没有在返回类型中明确表示延时;
阻塞模型的吞吐量受到线程池大小的限制;
创建并使用许多线程会耗费额外的时间用于上下文切换,影响系统性能。

5.2 事件驱动

事件驱动描述的是:**在发生某些特定事件时,就执行某些对应的代码。**基于事件驱动的模型, 我们需要在代码中用不同的方法来表示结果。用一个占位符来表示最终将会返回的结果: Future/CompletableFuture。

5.3 Future的使用与理解

方法返回 Future/CompletableFuture 之后,我们只得到了一个承诺,表示真正的值最终会返回到 Future 中。我们并不希望发起调用的线程等待返回结果,而是希望其在真正的结果返回后再执行特定的操作 (打印到控制台) 。 在一个事件驱动的系统中, 需要做的就是描述某个事件发生时需要执行的代码。在 Actor 中,描述接收到某个消息时进行的操作。同样地,在Future 中,我们描述 Future 的值真正可用时进行的操作。在 Java 8 中,使用 thenRun 来注册事件成功完成时需要执行的代码;而在 Scala 中,使用 onComplete

package com.akka.study.blog.lllpan;import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import org.junit.Test;
import scala.compat.java8.FutureConverters;
import scala.concurrent.Future;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;import static akka.pattern.Patterns.ask;
import static org.junit.Assert.*;public class JavaPongActorTest {ActorSystem system = ActorSystem.create();ActorRef actorRef = system.actorOf(Props.create(JavaPongActor.class), "BruceWillis");/*** 成功状态** @throws Exception*/@Testpublic void shouldReplyToPingWithPong() throws Exception {/* 向Actor询问其对于某个消息的响应*      param1:消息发送至的 Actor 引用*      param2:想要发送给 Actor 的消息;*      parma3:Future 的超时参数:等待结果多久以后就认为询问失败。*/Future sFuture = ask(actorRef, "Ping", 1000);// 将scala Future 转化为 CompletableFuturefinal CompletionStage<String> cs = FutureConverters.<Object>toJava(sFuture);final CompletableFuture<String> jFuture = (CompletableFuture<String>) cs;assertEquals("Pong", jFuture.get(1000, TimeUnit.MILLISECONDS));}/*** 失败状态** @throws Exception*/@Test(expected = ExecutionException.class)public void shouldReplyToUnknownMessageWithFailure() throws Exception {/* 向Actor询问其对于某个消息的响应*      param1:消息发送至的 Actor 引用*      param2:想要发送给 Actor 的消息;*      parma3:Future 的超时参数:等待结果多久以后就认为询问失败。*/Future sFuture = ask(actorRef, "unknown", 1000);// 将scala Future 转化为 CompletableFuturefinal CompletionStage<String> cs = FutureConverters.<Object>toJava(sFuture);final CompletableFuture<String> jFuture = (CompletableFuture<String>) cs;jFuture.get(1000, TimeUnit.MILLISECONDS);}//Future Examples@Testpublic void shouldPrintToConsole() throws Exception {askPong("Ping").thenAccept(x -> System.out.println("replied with: " + x));Thread.sleep(100);//no assertion - just prints to console. Try to complete a CompletableFuture instead.}@Testpublic void shouldTransform() throws Exception {char result = (char) get(askPong("Ping").thenApply(x -> x.charAt(0)));assertEquals('P', result);}/*** There is was a bug with the scala-java8-compat library 0.3.0 - thenCompose throws exception* https://github.com/scala/scala-java8-compat/issues/26* <p>* I confirmed fixed in 0.6.0-SNAPSHOT (10 months later). Just in time for publishing!*/@Testpublic void shouldTransformAsync() throws Exception {CompletionStage cs = askPong("Ping").thenCompose(x -> askPong("Ping"));assertEquals(get(cs), "Pong");}@Testpublic void shouldEffectOnError() throws Exception {askPong("cause error").handle((x, t) -> {if (t != null) {System.out.println("Error: " + t);}return null;});}@Testpublic void shouldRecoverOnError() throws Exception {CompletionStage<String> cs = askPong("cause error").exceptionally(t -> {return "default";});String result = (String) get(cs);}//首先检查 exception 是否为 null ,如果为 null,就返回包含结果的 Future,否则返回重试的 Future。最后调用 thenCompose 将嵌套的 CompletionStage 扁平化@Testpublic void shouldRecoverOnErrorAsync() throws Exception {CompletionStage<String> cf = askPong("cause error").handle((pong, ex) -> ex == null? CompletableFuture.completedFuture(pong): askPong("Ping") // 为null的情况重试).thenCompose(x -> x);assertEquals("Pong", get(cf));}@Testpublic void shouldPrintErrorToConsole() throws Exception {askPong("cause error").handle((x, t) -> {if (t != null) {System.out.println("Error: " + t);}return null;});Thread.sleep(100);}//Helperspublic Object get(CompletionStage cs) throws Exception {return ((CompletableFuture<String>) cs).get(1000, TimeUnit.MILLISECONDS);}public CompletionStage<String> askPong(String message) {Future sFuture = ask(actorRef, message, 1000);final CompletionStage<String> cs = FutureConverters.<Object>toJava(sFuture);return cs;}
}

这是一段异步的代码。Future或CompletableFuture 成功时会返回一个类型为 Object 的值,失败返回 Throwable 。

  • 对返回结果执行代码 一旦结果返回就执行一个事件,可以用 thenAccept 来操作返回的结果,
  • 对返回结果进行转换 最常见的一种用例就是在处理响应之前先异步的对其进行转换,thenApply 操作会返回个新的 Future.
  • 对返回结果进行异步转换 有时候进行异步调用,在得到结果后,进行另一个异步调用,可以使用 thenCompose。
  • 在失败的情况下使用handle,可参考方法shouldRecoverOnErrorAsync。

// Scala版本
class ScalaAskExamplesTest extends FunSpecLike with Matchers {val system: ActorSystem = ActorSystem()implicit val timeout: Timeout = Timeout(5 seconds)val pongActor: ActorRef = system.actorOf(Props(classOf[ScalaPongActor]))describe("Pong actor") {it("should respond with Pong") {// 向 Actor 请求一条响应消息val future = pongActor ? "Ping"/** Actor 的返回值是没有类型的,因此我们接收到的结果是 Future[AnyRef]。* 所以应调用 future.mapTo[String]将 Future 的类型转换成我们需要的结果类型。*/val result = Await.result(future.mapTo[String], 1 second)assert(result == "Pong")}it("should fail on unknown message") {val future = pongActor ? "unknown"intercept[Exception] {Await.result(future.mapTo[String], 1 second)}}}describe("FutureExamples") {import scala.concurrent.ExecutionContext.Implicits.globalit("should print to console") {askPong("Ping").onSuccess({case x: String => println("replied with: " + x)})Thread.sleep(100)}it("should transform") {val f: Future[Char] = askPong("Ping").map(x => x.charAt(0))val c = Await.result(f, 1 second)c should equal('P')}/*** Sends "Ping". Gets back "Pong"* Sends "Ping" again when it gets "Pong"*/it("should transform async") {val f: Future[String] = askPong("Ping").flatMap(x => {assert(x == "Pong")askPong("Ping")})val c = Await.result(f, 1 second)c should equal("Pong")}//doesn't actually test anything - demonstrates an effect. next test shows assertion.it("should effect on failure") {askPong("causeError").onFailure {case e: Exception => println("Got exception")}}/*** similar example to previous test, but w/ assertion*/it("should effect on failure (with assertion)") {val res = Promise()askPong("causeError").onFailure {case e: Exception =>res.failure(new Exception("failed!"))}intercept[Exception] {Await.result(res.future, 1 second)}}it("should recover on failure") {val f = askPong("causeError").recover({case t: Exception => "default"})val result = Await.result(f, 1 second)result should equal("default")}it("should recover on failure async") {val f = askPong("causeError").recoverWith({case t: Exception => askPong("Ping")})val result = Await.result(f, 1 second)result should equal("Pong")}it("should chain together multiple operations") {val f = askPong("Ping").flatMap(x => askPong("Ping" + x)).recover({case _: Exception => "There was an error"})val result = Await.result(f, 1 second)result should equal("There was an error")}it("should be handled with for comprehension") {val f1 = Future {4}val f2 = Future {5}val futureAddition =for {res1 <- f1res2 <- f2} yield res1 + res2val additionResult = Await.result(futureAddition, 1 second)assert(additionResult == 9)}it("should handle a list of futures") {val listOfFutures: List[Future[String]] = List("Pong", "Pong", "failure").map(x => askPong(x))val futureOfList: Future[List[String]] = Future.sequence(listOfFutures)}}def askPong(message: String): Future[String] = (pongActor ? message).mapTo[String]
}

5.4 链式操作

上面的每个方法都会返回一个新的 Future ,可以应用函数式的风格把多个操作组合起来,在组合的过程中无需处理异常。我们可以把注意力放在成功的情况上,在链式操作的结尾再收集错误。

askPong("Ping").thenCompose(x -> askPong("Ping" + x)).handle((x, t) -> {if (t != null) {return "default";} else {return x;}});

执行操作链中的任一操作时发生的错误都可以作为链的末端发生的错误来处理。这样就形成了一个很有效的操作管道,无论是哪个操作导致了错误,都可以在最后来处理异常。我们可以集中注意力描述成功的情况,无需在链的中间做额外的错误检查。可以在最后单独处理错误。

【Akka】Akka并发编程设计相关推荐

  1. java并发编程:设计原则与模式_java编程设计模式一——策略模式

    定义: 策略模式定义了算法族,分别封装起来,让它们之间相互替代.此模式让算法的变化独立于使用算法的客户. 问题描述: 建立一个鸭子Duck的父类,它有很多子类,Duck都会游泳,但是有鸭子会飞,有的鸭 ...

  2. 一文搞懂go并发编程设计原理

    前言 主要学习其设计原则,大体流程,权衡利弊 不要纠结于部分难懂的实现细节,因为不同的人对相同接口的实现细节不一样,就算是相同的人实现两次也可能不一样 context context的作用主要有两个: ...

  3. java akka 教程_Akka与并发编程模型

    Akka是一种并发编程模型的框架,其官网为http://akka.io.提供Java版本和Scala版本的API. 从学习或者使用的角度来说,我们首先要说明的是,Akka的并发编程模型(流水线模式)与 ...

  4. Akka框架——第一节:并发编程简介

    本节主要内容: 1. 重要概念 2. Actor模型 3. Akka架构简介 多核处理器的出现使并发编程(Concurrent Programming)成为开发人员必备的一项技能,许多现代编程语言都致 ...

  5. 15. Scala并发编程模型Akka

    15.1 Akka介绍 1) Akka是Java虚拟机JVM平台上构建高并发.分布式和容错应用的工具包和运行时,可以理解成Akka是编写并发程序的框架 2) Akka用Scala语言写成,同时提供了S ...

  6. Scala进阶之路-并发编程模型Akka入门篇

    Scala进阶之路-并发编程模型Akka入门篇 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 一.Akka Actor介绍 1>.Akka介绍 写并发程序很难.程序员不得不处 ...

  7. 最新天猫面试题(含总结):线程池+并发编程+分布式设计+中间件

    最新天猫面试题(含总结):线程池+并发编程+分布式设计+中间件 https://my.oschina.net/u/3892...

  8. 最新天猫Java面试题(含总结):线程池+并发编程+分布式设计+中间件

    一面: HashMap实现原理,ConcurrentHashMap实现原理 红黑树,为什么允许局部不平衡 TCP,UDP区别,为什么可靠和不可靠 一次HTTP请求的全过程,包括域名解析.定位主机等 T ...

  9. qt 5编程入门(第2版)_《C++并发编程实战第2版》第六章:设计基于锁的并发数据结构(1/3)...

    本章主要内容 设计并发数据结构的含义 设计指南 并发数据结构的示例实现 在上一章中我们了解了底层原子操作和内存模型.本章我们先把底层的细节放一放(尽管在第7章我们将需要它们),探讨一下数据结构. 为编 ...

最新文章

  1. Windows 8 动手实验系列教程 实验8:Windows应用商店API
  2. html5平板电脑,Html5添加支持桌面、移动触摸手机和平板电脑的Lightbox插件教程
  3. 存储过程实现邮件发送
  4. caffe+vs2013+window10+GPU(CPU)配置
  5. 17.3.10--关于C元的变量类型所占字节问题和类型转化
  6. ab性能测试工具使用
  7. 从“果链”到新能源,立讯精密的B端生意成色几何?
  8. 教你如何做好移动支付行业
  9. 好家伙,花了一个月时间写了个网易云,以后再也不用充VIP了!
  10. 鸿蒙系统桌面建文件夹,怎样把桌面上的文件放在一个文件夹里
  11. 打开outlook显示服务器内存不足,Outlook2013无法打开邮箱,报错提示可用内存不足...
  12. 十二个常见的Web安全漏洞总结及防范措施
  13. HTML-form表单+iframe
  14. 用RotateDrawable实现网易云音乐唱片机效果
  15. 腾讯进“鸡”加持智能网络,发布Supermind
  16. App首屏接口性能优化
  17. 简单记录下手机root 过程
  18. folly库安装(2)openssl升级、python3.8安装
  19. 大学计算机基础教学大纲,大学计算机基础教学大纲
  20. 圆锥误差补偿多子样算法

热门文章

  1. 两道小学生的题----1000: 梦里的难题、1002: 拳皇
  2. YOLOv7姿态估计pose estimation(姿态估计+目标检测+跟踪)
  3. 爬虫开发02--数据解析--xpath(首选)
  4. 数值分析各种算法C语言
  5. js免费刷流量软件工具源码
  6. 线搜索(line search)方法
  7. C++实现DES加密解密算法
  8. php手游服务端开发教程,【手游服务端】梦想海贼王 卡牌系列一键端服务端游戏源码+教程...
  9. 程序员把妹指南之电脑配置篇
  10. COUNTA函数功能是返回参数列表中非空值的单元格个数