与Actor集成

为了将流的元素作为消息传递给一个普通的actor,你可以在mapAsync里使用ask或者使用Sink.actorRefWithAck。

消息发送给流,可以通过Source.queue或者通过由Source.actorRef物化的ActorRef 。

mapAsync + ask

将流中元素的某些处理委托给actor的一个好方法是在mapAsync中使用ask。流的背压由ask的Future来维护,并且与mapAsync阶段的parallelism相比,actor的邮箱将不会填充更多的消息。

import akka.pattern.ask

implicit val askTimeout = Timeout(5.seconds)

val words: Source[String, NotUsed] =

Source(List("hello", "hi"))

words

.mapAsync(parallelism = 5)(elem => (ref ? elem).mapTo[String])

// continue processing of the replies from the actor

.map(_.toLowerCase)

.runWith(Sink.ignore)

请注意, 在参与者中接收的消息将与流元素的顺序相同, 即并行度不会更改消息的顺序。使用parallelism > 1有性能优势(即使actor一次只处理一条消息),因为在actor完成前一条消息处理时,邮箱已经有一条消息。

actor 必须为来自流的每条消息答复sender()。该答复将完成ask的Future, 它将是从mapAsync发给下游的元素。

class Translator extends Actor {

def receive = {

case word: String =>

// ... process message

val reply = word.toUpperCase

sender() ! reply // reply to the ask

}

}

通过发送 akka.actor.Status.Failure 作为参与者的答复, 以失败来完成流。

如果请求因超时而失败, 则流将以TimeoutException失败完成。如果这不是想要的结果, 你可以在ask Future``上使用recover```。

如果你不关心回复值, 只用它们作为背压信号, 你可以在mapAsync阶段之后使用Sink.ignore, 然后actor实际上是流的一个sink。

同样的模式可以与Actor routers一起使用。然后, 如果不关心发给下游的元素(答复)顺序, 则可以使用mapAsyncUnordered来提高效率。

Sink.actorRefWithAck

sink将流的元素发送到给定的 ActorRef, ActorRef发送背压信号。第一个元素总是 onInitMessage, 然后流等待actor的确认消息, 这意味着actor准备处理元素。它还要求每个流元素后返回确认消息,以便进行回压工作。

如果目标actor终止, 流将被取消。当流成功完成时, onCompleteMessage将被发送到目标actor。当流以失败完成时, 将向目标actor发送akka.actor.Status.Failure消息。

注意

使用Sink.actorRef或从map使用普通的tell或 foreach , 意味着没有来自目标actor的背压信号, 也就是说, 如果actor没有足够快地处理消息,该actor的邮箱将增长, 除非使用设置mailbox-push-timeout-time为0的有界邮箱或使用前面的速率限制阶段。不过,使用Sink.actorRefWithAck或者在mapAsync中使用ask更好。

Source.queue

Source.queue可用于从actor(或流外部运行的任何东西)发送元素给流。元素将被缓冲直到流可以处理它们。可以offer元素给队列,如果有下游的需求,它们将发送给流,否则将缓冲到收到需求的请求为止。

根据定义的 OverflowStrategy, 如果缓冲区中没有可用空间, 它可能会丢弃元素。OverflowStrategy.backpressure策略不支持这种Source类型,也就是说如果填充缓冲的速度比流可以处理的速度快,元素会被丢弃。如果你想要一个背压的actor接口,应当考虑使用Source.queue。

流可以通过发送akka.actor.PoisonPill或akka.actor.Status.Success给actor引用,成功完成。

流可以通过发送akka.actor.Status.Failure给actor引用,失败完成。

当流完成时,actor将终止,并失败或取消下游,也就是说,当发生这种情况时, 你可以观察它得到通知。

与外部服务集成

可以使用mapAsync或mapAsyncUnordered来执行涉及外部基于非流的服务的流转换和副作用。

例如,使用外部电子邮件服务向所选推文的作者发送电子邮件:

def send(email: Email): Future[Unit] = {

// ...

}

我们从推文的作者推特流开始:

val authors: Source[Author, NotUsed] =

tweets

.filter(_.hashtags.contains(akkaTag))

.map(_.author)

假设我们可以使用以下内容查找他们的电子邮件地址:

def lookupEmail(handle: String): Future[Option[String]] =

通过使用 lookupEmail 服务, 使用mapAsync可以将作者流转换为电子邮件地址流:

val emailAddresses: Source[String, NotUsed] =

authors

.mapAsync(4)(author => addressSystem.lookupEmail(author.handle))

.collect { case Some(emailAddress) => emailAddress }

最终,发送电子邮件:

val sendEmails: RunnableGraph[NotUsed] =

emailAddresses

.mapAsync(4)(address => {

emailServer.send(

Email(to = address, title = "Akka", body = "I like your tweet"))

})

.to(Sink.ignore)

sendEmails.run()

mapAsync 应用于给定的函数, 当元素通过这个处理步骤时, 将为它们每一个调用外部服务。函数返回Future,并把future的值发送给下游。将并行运行的Future数量作为 mapAsync 的第一个参数。这些Future可能以任何顺序完成, 但发送给下游的元素的顺序与从上游接收的顺序相同。

这意味着背压如预期的工作。例如,如果emailServer.send是瓶颈,将会限制传入推文的检索速度和email地址查找的速度。

这条管道的最后一块是产生通过电子邮件管道提取tweet作者信息的需求:我们附加一个Sink.ignore,使其全部运行。 如果我们的电子邮件处理将返回一些有趣的数据进行进一步的转换,那么我们当然不会忽视它,而是将结果流发送到进一步的处理或存储。

请注意, mapAsync 保留流元素的顺序。在这个例子中, 顺序并不重要, 我们可以使用更有效的 mapAsyncUnordered:

val authors: Source[Author, NotUsed] =

tweets.filter(_.hashtags.contains(akkaTag)).map(_.author)

val emailAddresses: Source[String, NotUsed] =

authors

.mapAsyncUnordered(4)(author => addressSystem.lookupEmail(author.handle))

.collect { case Some(emailAddress) => emailAddress }

val sendEmails: RunnableGraph[NotUsed] =

emailAddresses

.mapAsyncUnordered(4)(address => {

emailServer.send(

Email(to = address, title = "Akka", body = "I like your tweet"))

})

.to(Sink.ignore)

sendEmails.run()

在上述示例中, 服务方便地返回了一个Future结果。如果不是这样,你需要用Future来包裹调用。如果服务调用涉及阻塞, 还必须确保在专用执行上下文中运行它, 以避免“饥饿”和系统中其他任务的干扰。

val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher")

val sendTextMessages: RunnableGraph[NotUsed] =

phoneNumbers

.mapAsync(4)(phoneNo => {

Future {

smsServer.send(

TextMessage(to = phoneNo, body = "I like your tweet"))

}(blockingExecutionContext)

})

.to(Sink.ignore)

sendTextMessages.run()

"blocking-dispatcher"的配置可能类似于:

blocking-dispatcher {

executor = "thread-pool-executor"

thread-pool-executor {

core-pool-size-min = 10

core-pool-size-max = 10

}

}

阻塞调用的另一种替代方法是在map操作中执行这些操作, 但仍使用专用的调度器。

val send = Flow[String]

.map { phoneNo =>

smsServer.send(TextMessage(to = phoneNo, body = "I like your tweet"))

}

.withAttributes(ActorAttributes.dispatcher("blocking-dispatcher"))

val sendTextMessages: RunnableGraph[NotUsed] =

phoneNumbers.via(send).to(Sink.ignore)

sendTextMessages.run()

但是, 这与mapAsync不完全相同, 因为mapAsync可能同时运行多个调用, 但map一次执行一次。

对于一个服务作为一个actor公开,或者一个actor作为一个外部服务前的网关,你可以使用ask:

import akka.pattern.ask

val akkaTweets: Source[Tweet, NotUsed] = tweets.filter(_.hashtags.contains(akkaTag))

implicit val timeout = Timeout(3.seconds)

val saveTweets: RunnableGraph[NotUsed] =

akkaTweets

.mapAsync(4)(tweet => database ? Save(tweet))

.to(Sink.ignore)

请注意, 如果请求在给定的超时时间内未完成, 则流将通过失败完成。如果这不是想要的结果, 你可以使用在ask Future上的recover。

对顺序和并行性的说明

让我们再看看另一个例子, 以更好地了解 mapAsync 和 mapAsyncUnordered 的顺序和并行特性。

几个 mapAsync 和 mapAsyncUnordered future可能同时运行。并发的future数量受到下游需求的限制。例如, 如果下游要求的5个元素, 将有最多5个future在进行中。

mapAsync以收到元素的顺序发送future结果。这意味着,已完成的结果只有在先前的结果都已完成并发送后,才发送给下游。因此,一个缓慢的调用将延迟所有连续调用的结果, 尽管它们在慢速调用之前完成。

mapAsyncUnordered当future结果一完成就发送出去,也就是说,可能发送给下游元素的顺序不与从上游收到的顺序相同。因此,只要下游有多个元素的需求,一个缓慢的调用不会延迟连续调用的结果。

这里是一个虚拟的服务, 我们可以用它来说明这些方面。

class SometimesSlowService(implicit ec: ExecutionContext) {

private val runningCount = new AtomicInteger

def convert(s: String): Future[String] = {

println(s"running: $s (${runningCount.incrementAndGet()})")

Future {

if (s.nonEmpty && s.head.isLower)

Thread.sleep(500)

else

Thread.sleep(20)

println(s"completed: $s (${runningCount.decrementAndGet()})")

s.toUpperCase

}

}

}

以小写字母开头的元素被模拟为需要较长的处理时间。

下面是我们如何使用它与 mapAsync:

implicit val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher")

val service = new SometimesSlowService

implicit val materializer = ActorMaterializer(

ActorMaterializerSettings(system).withInputBuffer(initialSize = 4, maxSize = 4))

Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J"))

.map(elem => { println(s"before: $elem"); elem })

.mapAsync(4)(service.convert)

.runForeach(elem => println(s"after: $elem"))

输出可能如下所示:

before: a

before: B

before: C

before: D

running: a (1)

running: B (2)

before: e

running: C (3)

before: F

running: D (4)

before: g

before: H

completed: C (3)

completed: B (2)

completed: D (1)

completed: a (0)

after: A

after: B

running: e (1)

after: C

after: D

running: F (2)

before: i

before: J

running: g (3)

running: H (4)

completed: H (2)

completed: F (3)

completed: e (1)

completed: g (0)

after: E

after: F

running: i (1)

after: G

after: H

running: J (2)

completed: J (1)

completed: i (0)

after: I

after: J

注意,after行的顺序与before行相同,即使元素以不同顺序完成。例如H在g之前完成,但仍在后面发送。

括号中的数字说明同一时间内正在进行的调用数。因此,这里下游需求和并发调用的数量由ActorMaterializerSettings缓冲大小 (4) 限制 。

下面是我们在 mapAsyncUnordered 里使用相同服务:

implicit val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher")

val service = new SometimesSlowService

implicit val materializer = ActorMaterializer(

ActorMaterializerSettings(system).withInputBuffer(initialSize = 4, maxSize = 4))

Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J"))

.map(elem => { println(s"before: $elem"); elem })

.mapAsyncUnordered(4)(service.convert)

.runForeach(elem => println(s"after: $elem"))

输出可能如下所示:

before: a

before: B

before: C

before: D

running: a (1)

running: B (2)

before: e

running: C (3)

before: F

running: D (4)

before: g

before: H

completed: B (3)

completed: C (1)

completed: D (2)

after: B

after: D

running: e (2)

after: C

running: F (3)

before: i

before: J

completed: F (2)

after: F

running: g (3)

running: H (4)

completed: H (3)

after: H

completed: a (2)

after: A

running: i (3)

running: J (4)

completed: J (3)

after: J

completed: e (2)

after: E

completed: g (1)

after: G

completed: i (0)

after: I

注意,after行的顺序与before行不同。例如,H赶上了慢G。

括号中的数字说明同一时间内正在进行的调用数。因此,这里下游需求和并发调用的数量由ActorMaterializerSettings缓冲大小 (4) 限制 。

与响应式流集成

响应式流为异步流非阻塞式背压处理定义了一个标准。它使能够连接到符合标准的流库成为可能。Akka Stream就是一个这样的库。

其它实现的不完整列表:

Reactor (1.1+)

RxJava

Ratpack

Slick

在响应式流中两个最重要的接口是Publisher和Subscriber。

import org.reactivestreams.Publisher

import org.reactivestreams.Subscriber

假设有这样的一个库提供了一个推文的发布者:

def tweets: Publisher[Tweet]

而另外一个库知道如何将作者信息存储到数据库:

def storage: Subscriber[Author]

使用Akka Streams Flow ,可以转换流并连接它们:

val authors = Flow[Tweet]

.filter(_.hashtags.contains(akkaTag))

.map(_.author)

Source.fromPublisher(tweets).via(authors).to(Sink.fromSubscriber(storage)).run()

Publisher作为一个输入Source使用到流,而Subscriber作为一个输出Sink。

一个Flow也可以转换到RunnableGraph[Processor[In, Out]],当run()被调用时,它将物化到一个Processor。run()可以被多次调用,每次都会产生一个新的Processor实例。

val processor: Processor[Tweet, Author] = authors.toProcessor.run()

tweets.subscribe(processor)

processor.subscribe(storage)

一个发布者可以通过subscribe方法与一个订阅者连接。

也可以使用Publisher-Sink,将Source作为Publisher:

val authorPublisher: Publisher[Author] =

Source.fromPublisher(tweets).via(authors).runWith(Sink.asPublisher(fanout = false))

authorPublisher.subscribe(storage)

由Sink.asPublisher(fanout = false)创建的publisher仅支持单一订阅。其它的订阅尝试将被拒绝(带有IllegalStateException)。

使用fan-out/broadcasting 创建发布者,可以支持多个订阅者:

def alert: Subscriber[Author]

def storage: Subscriber[Author]

val authorPublisher: Publisher[Author] =

Source.fromPublisher(tweets).via(authors)

.runWith(Sink.asPublisher(fanout = true))

authorPublisher.subscribe(storage)

authorPublisher.subscribe(alert)

该阶段的输入缓冲区大小控制最慢的订阅者与最快订阅者之间的距离,然后才能减慢流的速度。

要使图完整, 还可以通过使用Subscriber-Source将Sink公开为Subscriber:

val tweetSubscriber: Subscriber[Tweet] =

authors.to(Sink.fromSubscriber(storage)).runWith(Source.asSubscriber[Tweet])

tweets.subscribe(tweetSubscriber)

也可以通过传递一个创建Processer实例的工厂函数,将Processor实例解包为一个Flow:

// An example Processor factory

def createProcessor: Processor[Int, Int] = Flow[Int].toProcessor.run()

val flow: Flow[Int, Int, NotUsed] = Flow.fromProcessor(() => createProcessor)

请注意, 工厂是必要的, 以实现可重用的Flow结果。

akka java ask_Akka Stream之集成相关推荐

  1. Java Streams,第 1 部分: java.util.stream 库简介

    Java SE 8 中主要的新语言特性是拉姆达表达式.可以将拉姆达表达式想作一种匿名方法:像方法一样,拉姆达 表达式具有带类型的参数.主体和返回类型.但真正的亮点不是拉姆达表达式本身,而是它们所实现的 ...

  2. java akka_用于大型事件处理的Akka Java

    java akka 我们正在设计一个大型的分布式事件驱动系统,用于跨事务数据库的实时数据复制. 来自源系统的数据(消息)在到达目的地之前经历了一系列转换和路由逻辑. 这些转换是多进程和多线程的操作,包 ...

  3. 用于大型事件处理的Akka Java

    我们正在设计一个大型的分布式事件驱动系统,用于跨事务数据库的实时数据复制. 来自源系统的数据(消息)在到达目的地之前经历了一系列转换和路由逻辑. 这些转换是多进程和多线程的操作,包括可以同时执行的较小 ...

  4. Java Reactor Stream

    一.Reactor 概念/POM 更多内容,前往 IT-BLOG "具有非阻塞负压功能的异步流处理系统" 的标准以及API.主要用来构建 JVM环境下的非阻塞应用程序.它直接和 J ...

  5. 牛逼哄洪的 Java 8 Stream,性能也牛逼么?

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试资料 Java8的Stream API可以极大提高Java程序员的生产力 ...

  6. java8 group by_java8新特性Java 8 – Stream Collectors groupingBy 示例 - Java教程

    在这篇教程中,将向你展示如何使用Java 8 Stream的Collectors,来对一个List进行分组,计算个数,求和以及排序. 1. Group By, Count and Sort 1.1 对 ...

  7. 从Java 8中的java.util.stream.Stream检索列表

    本文翻译自:Retrieving a List from a java.util.stream.Stream in Java 8 I was playing around with Java 8 la ...

  8. Java 8 Stream Api 中的 skip 和 limit 操作

    点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 1. 前言 Java 8 Stream API 中的sk ...

  9. Java 8 Stream API详解--转

    原文地址:http://blog.csdn.net/chszs/article/details/47038607 Java 8 Stream API详解 一.Stream API介绍 Java 8引入 ...

最新文章

  1. JSP、EL和JSTL-学习笔记01【JSP基础语法】
  2. Spring Boot下的lombok安装以及使用简介
  3. 粘性控件,滑动停留StickLayout(导航栏滑动停留)
  4. jboss7 关闭日志打印_使用自定义日志记录处理程序在JBoss AS 7中跟踪SQL语句
  5. 怎样配oracle环境,oracle配置会话环境(set命令)
  6. android viewpager fragment 生命周期,ViewPager中Fragment的生命周期
  7. 基于Spring Security的AJAX请求需要登录的解决方案
  8. 上课流程法-如何上好第一节课(1) 目录 1. 目录 1 1.1. 销售自己 1 1.2. 销售课程 1 1.3. 学习方法 1 1.4. 制定规章 2 2. 销售自己-自我介绍 2 2.1.
  9. PyTorch1.2安装(Anaconda3 + Python3.6 + cpu版本)
  10. 一个正经的前端学习 开源 仓库(每日更新)-648道知识点
  11. 周伟焜挥别IBM 一个时代结束
  12. ne_products 表
  13. Gobelieve 架构(转载)
  14. 区块链游戏走出一地鸡毛,元宇宙3D国风链游或成最大受益者
  15. 苹果开发者账号添加设备
  16. Gitee推送本地文件到仓库并且创建子文件夹(详细)
  17. 阿里云云原生数据湖体系全解读——数据湖 云原生计算引擎
  18. hal库中的UART使用
  19. matlab 场仿真,利用MATLAB软件实现温度场的仿真
  20. 基于高德POI搜索的地址类型判断

热门文章

  1. 剑指Offer(十九):顺时针打印矩阵
  2. 海康机器人图像采集卡安装与使用
  3. 该选国企,外企还是私企?
  4. n皇后 问题(原始的8皇后问题)
  5. python 异步文件读写_Python中使用asyncio 封装文件读写
  6. Win32K里的死循环
  7. WannaCry是什么 如何发起攻击
  8. 计算机设备驱动器空白文档,最新Wmdunh计算机基础知识(有试题和答案).docx
  9. 中国地图json、世界地图json
  10. UA-DETRAC BITVehicle车辆检测数据集(含下载地址)