本章包括:

  • 为什么流【streams】是事件【eventing】之上的一个有用的抽象
  • 什么是背压【back-pressure】,为什么它是异步生产者和消费者的基础
  • 如何从流【streams】中解析协议数据

到目前为止,我们一直在使用回调【callbacks】来处理事件,这些回调来自各种来源,例如HTTP或TCP服务器。回调【Callbacks 】使我们可以一次对一个事件进行推理【reason about】。

从TCP连接、文件或HTTP请求传入的数据缓冲区【data buffer】,在处理上没有太大区别:您需要声明一个回调处理程序,它对每个事件作出响应,并允许自定义处理。

话虽如此,但是大多数事件需要作为一系列事件【series】而不是孤立事件【isolated】来处理。处理HTTP请求的请求体【body】就是一个很好的例子,因为需要组装【assembled】几个大小不同的缓冲区以重新构成【reconstitute 】完整的请求体负载【body payload】。

由于响应式应用程序处理非阻塞I/O,因此有效【efficient 】和正确【correct】的流处理是关键。在本章中,我们将探讨为什么流会带来挑战,以及Vert.x如何提供了一个全面的统一流模型。

4.1 统一的流模型

Vert.x提供了跨多种资源类型(例如文件【file】,网络套接字【network sockets】等)的流的统一抽象。

  • 读流【read stream】是可以被读取的事件的来源【source】
  • 写流【write stream】是事件被发送到的目标地址【destination】。

例如,HTTP请求是读流,HTTP响应是写流。

Vert.x中的流涵盖了各种各样的数据源/发生器【source】和接收器【sink】,如表4.1所示:

Stream resource Read support Write support
TCP sockets YES YES
UDP datagrams YES YES
HTTP requests and responses YES YES
WebSockets YES YES
Files YES YES
SQL results YES NO
Kafka events YES YES
Periodic timers YES NO

Table 4.1 Vert.x common read and write streams

读流和写流是分别通过io.vertx.core.streams包中的ReadStream和WriteStream接口定义的。您将主要处理实现这两个接口的API,而不是自己实现它们,尽管如果当您需要连接到某些第三方异步事件API时,您可能必须这样做。

这些接口中的API可以大致分作两部分:

  • 读写数据的基本方法
  • 背压管理方法(我们将在下一节中介绍)

表4.2列出了读流【read streams】的基本方法。它们定义了三种类型的事件通知的回调:

  • 已读取了某些数据【 some data has been read】
  • 已发生异常【an exception has arisen】
  • 流已结束【the stream has ended】
Method Description
handler(Handler < T >) 处理一个T类型的新读取到的值(例如,Buffer, byte[], JsonObject等)
exceptionHandler(Handler< Throwable >) 处理读异常
endHandler(Handler< Void >) 当流结束时调用,要么是因为已读取所有数据,要么是因为引发了异常

Table 4.2 ReadStream essential methods

类似地,表4.3中列出了写流【write stream】的基本方法:

  • 写入数据
  • 结束流
  • 在出现异常时得到通知:
Method Description
write(T) 写入一些T类型的数据(例如,Buffer, byte[], JsonObject等)。
exceptionHandler(Handler< Throwable >) 处理写异常
end() 结束流
end(T) 写入一些T类型的数据,然后结束流

Table 4.3 WriteStream essential methods

在前面的章节中,我们已经在不知情的情况下操纵了流,比如使用TCP和HTTP服务器。

java.io API形成了一个经典的流I/O抽象,用于在Java中从各种资源【source】读取和写入数据,尽管它使用的是阻塞API。将JDK streams 与Vert.x non-blocking stream API进行比较很有趣。

假设我们想要读取文件的内容并将其内容输出到标准控制台输出。

public static void main(String[] args) {File file = new File("build.gradle.kts");byte[] buffer = new byte[1024];try (FileInputStream in = new FileInputStream(file)) {   ❶ 使用try-with-resource确保reader.close() 总是会被调用,无论是正常结束还是异常终止int count = in.read(buffer);while (count != -1) {System.out.println(new String(buffer, 0, count));count = in.read(buffer);}} catch (IOException e) {e.printStackTrace();} finally {System.out.println("\n--- DONE");                      ❷ 读取完成后,我们向控制台插入两行代码。}
}

Listing 4.1 Reading a file using JDK I/O APIs

代码清单4.1展示了一个经典示例,该示例使用JDK I/O流读取文件,然后将其内容输出到控制台,同时处理了可能出现的错误。我们将数据读取到缓冲区中,然后立即将缓冲区中的内容写入到标准控制台,然后再回收缓冲区以进行下一次读取。

代码清单4-2展示了一个与代码清单4.1中功能相同的代码,但是它使用了Vert.x异步文件API:

public static void main(String[] args) {Vertx vertx = Vertx.vertx();OpenOptions opts = new OpenOptions().setRead(true);          ❶ 使用Vert.x打开文件需要一些选项,例如文件是否处于读取,写入,追加模式等。vertx.fileSystem().open("build.gradle.kts", opts, ar -> {    ❷ 打开文件是一个异步操作。if (ar.succeeded()) {AsyncFile file = ar.result();                            ❸ AsyncFile 是Vert.x异步文件的接口file.handler(System.out::println)                        ❹ 新缓冲区数据的回调.exceptionHandler(Throwable::printStackTrace)          ❺ 异常发生时的回调.endHandler(done -> {                                  ❻ 流关闭时的回调System.out.println("\n--- DONE");vertx.close();});} else {ar.cause().printStackTrace();}});
}

Listing 4.2 Reading a file using Vert.x streams

这里的方法是声明式的,因为我们在读取流时为不同类型的事件【events 】定义了处理程序【handlers 】。在代码清单4-2中,我们正在被推送【Push】数据,而在代码清单4.1中,我们是从流中拉取【Pull】数据。

乍一看,这种差异似乎只是表面上的,在代码清单4-1例子中,数据是拉取【Pull】的,而在清单4-2的例子中,数据则是被推送【Push】的。但是,差异是很大的,无论是使用Vert.x还是其他解决方案,我们都需要了解它以掌握异步流【asynchronous streams】。

这就引出了“背压【back-pressure】”的概念。

4.2 什么是背压

背压是一种机制,事件的消费者可以通过该机制,向事件的生产者发出信号,通知它正在以比消费者处理事件更快的速率发送着事件。在响应式系统中,背压用于使生产者暂停或减慢速度,这样消费者就可以避免在无限界的内存缓冲区【unbounded memory buffers】中累积未处理的事件,从而可能耗尽资源。

为了理解为什么背压【back-pressure】对异步流很重要,让我们以一个用于下载Linux发行版映像的HTTP服务器为例,并考虑在没有任何背压管理策略下的实现。

Linux发行版映像通常以.iso文件的形式发布,很容易就会占用几个GB的内存。实现一个可以分发此类文件的服务器需要做以下工作:

  • 开启一个HTTP服务器
  • 对于每个传入的HTTP请求,找到相应的文件
  • 将每个从文件中读取的缓冲区写入到HTTP响应体中

图4.1展示了这是如何与Vert.x一起工作的,尽管这也适用于任何非阻塞I/O API。从文件流中读取数据缓冲区【Data buffers】,然后将其传递给处理程序【handler】。该处理程序除了直接将每个缓冲区【buffer】写入到HTTP响应流中之外,几乎不做任何其他事情。每个缓冲区【buffer】最终将被直接(或作为较小的块)写入到底层TCP缓冲区中去。由于TCP缓冲区可能已经满了(可能是网络原因,也可能是客户端繁忙),所以有必要维护一个缓冲区【buffer】,用于缓冲写入的缓冲【pending buffers】(即图4.1中的写入队列【write queue】)。记住,写操作是非阻塞的,因此需要缓冲。这听起来像是一个非常简单的处理管道,那么可能会出现什么问题呢?

Figure 4.1 Reading and then writing data between streams without any back-pressure signaling

从文件系统中读取通常是快速和低延迟的,并且给定多个读取请求,操作系统可能会将一些Page页缓存到RAM中。相比之下,向网络写入则要慢得多,而且带宽取决于最弱的网络链路。延迟也会发生。

如图4.1所示,由于读操作比写操作要快得多,写缓冲【write buffer】可能会迅速变大。如果我们有几千个下载ISO镜像的并发连接,我们可能会在写入队列【write queue】中累积大量缓冲区。实际上,在JVM进程内存中可能有几个GB的ISO映像,正在等待通过网络写入!写入队列【write queue】中的缓冲区越多,进程消耗的内存就越多。

这里的风险显然是资源耗尽,因为进程吃掉了所有可用的物理内存,或者因为它运行在一个内存受限的环境中,比如容器中。这增加了消耗太多内存甚至崩溃的风险。

正如您可能猜到的那样,一种解决方案是背压信号【back-pressure signaling】,它使读流【read stream】能够适应写流【write stream】的吞吐量。在前面的示例中,当HTTP响应的写入队列【write queue】太大时,它应该能够通知文件读流【file read stream】它运行得太快了。在实践中,暂停发生器流【source stream】是管理背压的一种好方法,因为这样就可以有时间将写缓冲中的条目【ittem】写出,而不会累积新的条目【ittem】。

TIP 阻塞I/O API有一种隐式的背压形式,即通过阻塞执行线程直到I/O操作完成为止。当缓冲区满时,写操作会阻塞,这将阻止被阻塞的线程拉取【pull】更多数据,直到写操作完成为止。

Method Description
pause() 暂停流,防止后续数据被发送到处理程序
resume() 再次开始读取数据并将其发送给处理程序
fetch(n) 要求读取(最多)n个元素。在调用fetch(n)之前,流必须暂停

Table 4.4 ReadStream back-pressure management methods

当一个读流被暂停时,我们可以在其之上通过fetch获取一定数量的元素,这是异步拉取【asynchronous pulling】的一种形式。这意味着处理程序可以使用fetch请求元素,设置自己的速度。你将在本章的最后一节看到具体的例子。

在任何情况下,调用resume()都会导致流以尽可能快的速度再次开始推送【pushing 】数据。

表4.5显示了WriteStream中相应的背压管理方法。

Method Description
setWriteQueueMaxSize(int) 定义在认为已满之前,最大写缓冲队列【write buffer queue】大小应该是多少。注意:该方法所设置的单位并非字节
boolean writeQueueFull() 表示写缓冲队列【write buffer queue】已满
drainHandler(Handler< Void >) 定义一个回调函数,当写缓冲队列【write buffer queue】由满【full】过渡到排干时被调用(通常是当它回到最大大小的一半时)

写缓冲队列【write buffer queue】有一个最大大小,超过这个大小就被认为是满【full】的。写队列【Write queues】有默认大小,您很少需要调整,但如果您愿意,也可以这样做。需要注意的是,当队列满了时,仍然可以进行写操作,并且数据将仍然在队列中累积。写入者应该检查队列是否已满,但没有要求写入者必须强制执行。当写入者知道写队列【Write queues】已满时,可以通过一个排干处理程序【drain handler】通知它可以重新写入数据。通常情况下,当一半的写队列【Write queues】已经被排干时,就会发生这种情况。

现在您已经看到了ReadStream和WriteStream中提供的背压操作,下面是我们通过HTTP提供ISO镜像下载的示例中的流程控制图谱:

  1. 对于每个读缓冲【read buffer】,将其写入HTTP响应流中
  2. 检查写缓冲【write buffer】是否已满
  3. 如果满了,则先暂停文件读流【file read stream】,然后安装一个排干处理程序【drain handler】,当它被调用时,可以重新唤醒文件读流【file read stream】

注意,这种背压管理策略并不总是你所需要的:

  • 有些情况下,在写队列【write queue】满时,删除数据在功能上是正确的,甚至是可取的。
  • 有些事件源并不支持像Vert.x ReadStream 那样的暂停,您需要在删除数据和缓冲之间做出选择,即使这可能会导致内存耗尽。

处理背压的适当策略取决于您所编写的代码的功能要求。一般来说,您会更喜欢像Vert.x streams提供的流控制,但是当这不可能时,您就需要采用另一种策略。

现在让我们将所看到的所有内容整合到一个应用程序中。

4.3 Making a music-streaming jukebox

我们将通过一个音乐流媒体点唱机的例子来阐明Vert.x streams 和背压管理(参见图4.2)

Figure 4.2 Jukebox application overview

其思路是,点唱机有几个MP3文件存储在本地,客户端可以通过HTTP连接来侦听流。也可以通过HTTP下载单个文件。反过来,通过一个简单的、基于文本的TCP协议来控制何时播放、暂停和安排歌曲。所有连接的播放器【players】将在同一时间收听到相同的音频,除了由于播放器【players】设置的缓冲造成的轻微延迟。

这个例子将让我们看到如何处理自定义流量调整和不同的背压管理策略,以及如何解析流。

4.3.1 FEATURES AND USAGE

我们将要构建的应用程序可以使用一个Gradle任务从本书GitHub库中的代码中运行,如清单4.3的控制台输出所示。

Note 如果你想让点唱机播放音乐,你需要将一些MP3文件复制到项目目录下名为tracks/的文件夹中。

$ ./gradlew run -PmainClass=chapter4.jukebox.Main                 ❶ Main类是chapter4.jukebox.Jukebox> Task :run
[vert.x-eventloop-thread-0] chapter4.jukebox.Jukebox - Start      ❷ 我们部署了两个verticle
[vert.x-eventloop-thread-1] chapter4.jukebox.NetControl - Start

Listing 4.3 Running the jukebox application

在这个应用程序中部署了两个verticles :

  • Jukebox 提供主要的音乐流【music-streaming】逻辑和HTTP服务器接口,以供播放器【players】连接。
  • NetControl 提供一个基于文本的TCP协议,用于远程控制点唱机应用程序。

Figure 4.3 VLC connected to the jukebox

想要听音乐,用户可以通过VLC这样的播放器(见图4.3)连接上来,甚至可以直接在http://localhost:8080/打开web浏览器。

另一方面,我们还可以通过netcat这样的工具控制播放器,比如用纯文本命令列出所有文件,安排一个曲目播放,暂停或重启流。清单4.4显示了一个使用netcat的交互式会话。

$ netcat localhost 3000                                     ❶ 控制TCP服务器监听端口3000
/list                                                       ❷ 命令行列出所有文件
Daniela-La-Luz-Did-you-Ever-(Original-Mix).mp3
The-Revenge-Let-Love-Take-The-Blame-(Original-Mix).mp3
intro.mp3
SQL-Surrender-(Original-Mix).mp3
/schedule SQL-Surrender-(Original-Mix).mp3                  ❸ 调度将文件添加到播放列表中
/pause                                                      ❹ 这将暂停所有连接的播放器的流。
/play                                                       ❺ 唤醒流
/schedule Daniela-La-Luz-Did-you-Ever-(Original-Mix).mp3    ❻ 当第一首曲目结束时,我们安排另一首曲目。
^C                                                          ❼ 我们可以用Ctrl+C退出netcat会话,不会造成任何伤害。

Listing 4.4 Controlling the jukebox with netcat

TIP netcat在你的Unix环境中可以作为nc使用。我不知道在WSL环境之外还有什么友好的、等价的Windows工具。

最后,我们希望能够通过HTTP下载任何已知文件名的MP3:

curl -o out.mp3 http://localhost:8080/download/intro.mp3

现在让我们详细分析实现的各个部分。

4.3.2 HTTP PROCESSING: THE BIG PICTURE

将有许多代码片段涉及HTTP服务器处理,因此最好查看图4.4,以了解下一段代码将如何组合在一起。

HTTP服务器处理涉及许多代码片段,因此,我么可以结合图4.4,来为了理解下面几段代码是如何组合在一起的。

Figure 4.4 Big picture of the HTTP server processing

传入的HTTP请求有两种类型:

  • 客户端希望直接按名称下载文件
  • 或者希望加入音频流

二者的处理策略是非常不同的。

在下载文件的情况下,目标是执行从文件读流【file read stream】到HTTP响应写流【HTTP response write stream】的直接复制。这将通过背压管理来实现,以避免过度缓冲【excessive buffering】。

流媒体有点复杂,因为我们需要跟踪所有流媒体的HTTP响应写流。计时器定期从当前MP3文件中读取数据,并为每个流【streamer】复制和写入数据。

流式传输【streaming】有点复杂,因为我们需要跟踪所有流式传输的HTTP响应写流【HTTP response write streams】。计时器定期从当前MP3文件读取数据,并为每个流【streamer】写入数据。

让我们看看这些部分是如何实现的。

4.3.3 JUKEBOX VERTICLE BASICS

如清单4-5所示,Jukebox verticle类的状态是由一个播放状态【play status】和一个播放列表【playlist】定义的:

private enum State {PLAYING, PAUSED}private State currentMode = State.PAUSED;private final Queue<String> playlist = new ArrayDeque<>();

Listing 4.5 State of the Jukebox class

枚举类型State定义了两种状态,而Queue 则保存了接下来要播放的所有预定音频。再次,绿色。x线程模型确保单线程访问,因此不需要并发集合和临界区。

同样,Vert.x线程模型保证了单线程访问【single-threaded access】,因此无需使用并发集合,也无需关心临界区。

Jukebox verticle的start方法(清单4.6)需要配置几个事件总线处理程序【event-bus handlers】,这些处理程序对应于可以从TCP文本协议使用的命令和操作。NetControl verticle处理TCP服务器的内部并将消息发送到事件总线,我们稍后将对此进行分析。

@Override
public void start() {EventBus eventBus = vertx.eventBus();eventBus.consumer("jukebox.list", this::list);eventBus.consumer("jukebox.schedule", this::schedule);eventBus.consumer("jukebox.play", this::play);eventBus.consumer("jukebox.pause", this::pause);// (...more later!)
}

Listing 4.6 Setting up the event-bus handlers in the Jukebox verticle

请注意,因为我们已经抽象出将命令在事件总线上的传输,所以我们可以很容易地插入新的命令来控制点唱机,比如使用移动应用程序、web应用程序等等。

下面的清单提供了播放/暂停处理程序和调度处理程序。这些方法直接操作播放状态和播放列表状态。

private void play(Message<?> request) {currentMode = State.PLAYING;
}private void pause(Message<?> request) {currentMode = State.PAUSED;
}private void schedule(Message<JsonObject> request) {String file = request.body().getString("file");if (playlist.isEmpty() && currentMode == State.PAUSED) {    ❶ currentMode = State.PLAYING;}playlist.offer(file);
}

Listing 4.7 Play/pause and schedule operations in the Jukebox verticle

如下一个清单所示,列出所有可用文件要复杂一些。

private void list(Message<?> request) {vertx.fileSystem().readDir("tracks", ".*mp3$", ar -> {   ❶ 我们异步获取trace/文件夹下所有mp3文件if (ar.succeeded()) {List<String> files = ar.result().stream().map(File::new).map(File::getName).collect(Collectors.toList());JsonObject json = new JsonObject().put("files", new JsonArray(files));request.reply(json);                                 ❷ 我们构建一个JSON 响应} else {logger.error("readDir failed", ar.cause());request.fail(500, ar.cause().getMessage());          ❸ 这是在事件总线上的请求/应答通信【request/reply communication】中发送失败代码和错误消息的示例。}});
}

Listing 4.8 Listing all available files in the Jukebox verticle

4.3.4 INCOMING HTTP CONNECTIONS

有两种类型的HTTP客户端:

  • 想要音频流
  • 想要下载文件

HTTP服务器在verticle 的start方法中启动(参见下一个清单)。

@Override
public void start() {EventBus eventBus = vertx.eventBus();eventBus.consumer("jukebox.list", this::list);eventBus.consumer("jukebox.schedule", this::schedule);eventBus.consumer("jukebox.play", this::play);eventBus.consumer("jukebox.pause", this::pause);vertx.createHttpServer().requestHandler(this::httpHandler).listen(8080);// (...more later!)    ❶ 我们将在稍后的MP3流式传输中对此进行扩展。
}

Listing 4.9 Setting up the HTTP server in the Jukebox verticle

Vert.x HTTP服务器使所用的请求处理程序如下所示。它将HTTP请求转发到openAudioStream 和download 方法,这些方法完成请求并继续。

private void httpHandler(HttpServerRequest request) {if ("/".equals(request.path())) {openAudioStream(request);return;}if (request.path().startsWith("/download/")) {String sanitizedPath = request.path().substring(10).replaceAll("/", "");❶ 这个字符串替换可以防止恶意尝试从其他目录读取文件(想想有人愿意读取/etc/passwd)download(sanitizedPath, request);return;}request.response().setStatusCode(404).end();                              ❷ 当没有匹配的,我们返回404响应
}

Listing 4.10 HTTP request handler and dispatcher

下面的清单显示了openAudioStream方法的实现。它将流准备为分块模式【chunking mode】,并设置适当的Content-Type,并将响应对象放在一边供之后使用。

private final Set<HttpServerResponse> streamers = new HashSet<>();   ❶ 我们在一组HTTP响应中跟踪所有当前流【streamer】。private void openAudioStream(HttpServerRequest request) {HttpServerResponse response = request.response().putHeader("Content-Type", "audio/mpeg").setChunked(true);                                               ❷ 它是一个流,所以长度未知streamers.add(response);response.endHandler(v -> {streamers.remove(response);                                      ❸ 当一个流退出时,它将不再被跟踪。logger.info("A streamer left");});
}

Listing 4.11 Dealing with new stream players

4.3.5 DOWNLOADING AS EFFICIENTLY AS POSSIBLE

载文件就是一个很好的例子,其中可以使用背压管理来协发生器流【source stream】(即文件)和接收器流【sink stream】(即HTTP响应)。

下面的清单显示了我们如何查找文件,当它存在时,我们将最终的下载任务委托给downloadFile方法

private void download(String path, HttpServerRequest request) {String file = "tracks/" + path;if (!vertx.fileSystem().existsBlocking(file)) {      ❶ 除非您是在一个网络文件系统上,否则可能的阻塞时间是有限的,因此我们避免了嵌套的回调级别。request.response().setStatusCode(404).end();return;}OpenOptions opts = new OpenOptions().setRead(true);vertx.fileSystem().open(file, opts, ar -> {if (ar.succeeded()) {downloadFile(ar.result(), request);} else {logger.error("Read failed", ar.cause());request.response().setStatusCode(500).end();}});
}

Listing 4.12 Download method

方法downloadFile 的实现如下面的清单所示:

private void downloadFile(AsyncFile file, HttpServerRequest request) {HttpServerResponse response = request.response();response.setStatusCode(200).putHeader("Content-Type", "audio/mpeg").setChunked(true);file.handler(buffer -> {response.write(buffer);if (response.writeQueueFull()) {                ❶ 如果写队列【write queue】中的字节数比使用setWriteQueueMaxSize设置的值多,则返回true!file.pause();                                 ❷ 通过暂停读流【read stream】来背压应用程序response.drainHandler(v -> file.resume());    ❸ 当排干时,唤醒}});file.endHandler(v -> response.end());
}

Listing 4.13 Downloading a file

在两个流之间复制数据时,需要考虑背压。当策略是暂停发生器【source】并且不能丢失任何数据时,通常会这样做,因此可以像下面的清单中那样重写这些相同的代码。

HttpServerResponse response = request.response();
response.setStatusCode(200).putHeader("Content-Type", "audio/mpeg").setChunked(true);file.pipeTo(response);         ❶ 将数据从文件传输到响应中

Listing 4.14 Pipe helper

在ReadStream 和WriteStream之间进行复制时,通常使用管道来处理背压。它还负责管理发生器流【source strea,】的结束【end 】和两个流上的错误【error】。清单4.14的代码与清单4.13所示的代码完全相同,但没有了样板代码。pipeTo还有其他一些变体用于指定自定义处理程序。

4.3.6 READING MP3 FILES, BUT NOT TOO FAST

MP3文件有一个头【header 】,其中包含艺术家名称、类型、比特率等元数据。随后有几个帧包含压缩音频数据,解码器可以将其转换为脉冲编码调制数据,最终将其转换为声音。

MP3解码器对错误的适应能力很强,所以如果他们在文件中间开始解码,他们仍然会设法计算出比特率,他们会对齐下一帧开始解码音频。您甚至可以将多个MP3文件联结【concatenate 】在一起,并将它们发送到播放器。只要所有文件使用相同的比特率和立体声模式,音频将可以被解码。

当我们设计一个音乐流媒体点唱机时,这对我们来说很有趣:如果我们的文件以相同的方式进行编码,我们可以一个接一个地推送播放列表中的每个文件,解码器将很好地处理这些音频。

WHY BACK-PRESSURE ALONE IS NOT ENOUGH

把MP3数据输送给许多联网的播放器并不像看起来那么简单。主要的问题是确保所有现在和未来接入的播放器都能在大致相同的时间听相同的音乐。所有的播放器都有不同的本地缓冲策略,即使是在网络延迟的情况下,仍然可以确保流畅的回放,但是如果服务器只是尽可能快地推送文件,并不是所有的客户端都将同步。更糟糕的是,当一个新的播放器连接上来,更糟糕的是,它可能不会收到任何播放内容,而当前(其他)的播放器在缓冲区中还有几分钟的音乐。为了提供合理的回放体验,我们需要控制读取文件的速度,为此我们将使用计时器。

这在图4.5中进行了说明,图中显示了在没有和有速率控制的情况下流会发生什么。在这两种情况下,假设玩家A在开始时加入流,而玩家B在10秒后加入流。如果没有读取速率控制,我们会发现自己处于与下载MP3文件类似的情况。在将MP3数据块复制到连接的客户端时,我们可能会有反向压力,以确保有效的资源使用,但流体验将非常糟糕。

如图4.5所示,它显示了在没有对流的速率控制的情况下会发生什么。在这两种情况下,假设播放器A在一开始就加入到流中,而播放器B在10秒钟后才加入进来。如果没有读取速率控制的话,我们发现自己的情况与下载MP3文件的情况类似。在将MP3数据块复制到连接的客户端时,我们可以施加背压,以实现有效的资源使用。但这样的话,流式传输【streaming】体验将会非常糟糕。

Figure 4.5 Streaming without and with rate control

因为我们基本上是在尽可能快地流式传输数据【streaming data】,播放器A发现它的内部缓冲区几乎装满了所有的当前文件数据。虽然它可能处于0分15秒的位置上,但它已经接收了超过3分钟的数据。当播放器B加入时,它开始接收文件中更远的MP3数据块,所以它在3分30秒的位置开始播放。如果将我们的推理扩展到多个文件,一个新的播放器可以接入但是不接收任何数据,而之前连接的播放器可能有多个歌曲在他们的内部缓冲等待播放。

相比之下,如果我们控制MP3文件的读取速率,也就是MP3块被复制和写入到连接的播放器的速率,我们就可以确保它们或多或少都处于相同位置。

这里的速率控制是为了确保所有播放器都能以足够快的速度接收数据,这样他们就可以在没有中断的情况下播放音乐,但又不能太快,这样他们就不会缓冲太多的数据。

RATE-LIMITED STREAMING IMPLEMENTATION

下面我们来看一下完整的Jukebox verticle的start方法,这里它需要用到一个计时器:

@Override
public void start() {EventBus eventBus = vertx.eventBus();eventBus.consumer("jukebox.list", this::list);eventBus.consumer("jukebox.schedule", this::schedule);eventBus.consumer("jukebox.play", this::play);eventBus.consumer("jukebox.pause", this::pause);vertx.createHttpServer().requestHandler(this::httpHandler).listen(8080);vertx.setPeriodic(100, this::streamAudioChunk);     ❶ streamAudioChunk 周期性的将MP3数据推送出去(100毫秒是纯粹的经验,所以可以随意调整)。
}

Listing 4.15 Jukebox verticle class start method

除了连接事件总线处理程序和启动HTTP服务器之外,start方法还定义了一个计时器,以便每100毫秒流式传输一次数据。

接下来,我们来看看streamAudioChunk方法的实现。

private AsyncFile currentFile;
private long positionInFile;private void streamAudioChunk(long id) {if (currentMode == State.PAUSED) {return;}if (currentFile == null && playlist.isEmpty()) {currentMode = State.PAUSED;return;}if (currentFile == null) {openNextFile();}currentFile.read(Buffer.buffer(4096), 0, positionInFile, 4096, ar -> {   ❶ 缓冲【Buffer】不能跨I/O操作重用,因此我们需要一个新的缓冲区。if (ar.succeeded()) {processReadBuffer(ar.result());                                      ❷ 这是数据被复制到所有接入的播放器的地方。} else {logger.error("Read failed", ar.cause());closeCurrentFile();}});
}

Listing 4.16 Streaming file chunks

streamAudioChunk的代码最多读取4096字节的块。由于该方法总是每秒调用10次,因此还需要检查是否有任何内容正在播放。processReadBuffer方法对数据进行流处理,如下面的清单所示。

private void processReadBuffer(Buffer buffer) {positionInFile += buffer.length();if (buffer.length() == 0) {                       ❶ 当到达文件的末尾时就会发生这种情况closeCurrentFile();return;}for (HttpServerResponse streamer : streamers) {if (!streamer.writeQueueFull()) {               ❷ 背压streamer.write(buffer.copy());                ❸ 记住,缓冲区不能被重用。}}
}

Listing 4.17 Streaming data chunks to players

对于每个发送到播放器的HTTP响应流,该方法负责将读取的数据复制到响应中。注意,这里还有另一个背压管理:当客户端的写队列【write queue 】已满时,我们简单地丢弃数据。在客户端,这将导致音频质量下降,但由于服务器上的队列已满,这意味着玩家无论如何都会出现延迟或音质下降。丢弃数据是没问题的,因为MP3解码器知道如何恢复【recover】,这可以确保与其他播放器的播放时间保持紧密一致。

WARNING Vert.x缓冲区一旦被写入,就不能被重用,因为它们被放置在写入队列中。重用缓冲区总是会导致bug,所以不要在这里寻找不必要的优化。

最后,下面清单中的助手方法允许打开和关闭文件。

private void openNextFile() {OpenOptions opts = new OpenOptions().setRead(true);currentFile = vertx.fileSystem().openBlocking("tracks/" + playlist.poll(), opts);    ❶ 同样,我们使用了阻塞式变体,但在打开文件时很少会出现问题。positionInFile = 0;
}private void closeCurrentFile() {positionInFile = 0;currentFile.close();currentFile = null;
}

Listing 4.18 Opening and closing files

4.4 Parsing simple streams

到目前为止,我们对点唱机例子的分析主要集中在用于下载和传输MP3数据的Jukebox verticle 上。现在是时候分析NetControl verticle 部分了,它在端口3000上公开了TCP服务器,用于接收文本命令来控制点唱机播放的内容。从异步数据流中提取数据是一个常见的需求,Vert.x为此提供了有效的工具。

我们的文本协议中的命令的形式如下:

/action [argument]

下面是支持的action动作:

  • /list 列出可用于回放的文件
  • /play 确保流播放
  • /pause 暂停流
  • /schedule file 在播放列表的末尾追加文件

每个文本行【text line】可以只有一个命令,因此协议被称为换行分隔【newline-separated】。

为此我们需要一个解析器,因为缓冲区以块的形式到达,每个块很少对应一行。例如,第一个读缓冲【read buffer】可以包含以下内容:

ettes.mp3
/play
/pa

下一个读缓冲【read buffer】中的内容则可能类似于这样:

use
/schedule right-here-righ

接下来的则可能是这样:

t-now.mp3

我们真正想要的是逐行进行推理,所以解决方案是在缓冲区到达时对缓冲进行联结【concatenate】,然后在换行符上再次分割它们,这样每个缓冲区有一行。Vert.x通过RecordParser类提供了一个方便的解析助手程序,而不需手动组装中间缓冲。解析器通过查找分隔符或处理固定大小的块,摄取缓冲区并使用已解析数据生成新的缓冲区。

在本例中,我们需要在流中寻找换行分隔符。下面的清单展示了如何在NetControl垂直中使用RecordParser。

@Override
public void start() {vertx.createNetServer().connectHandler(this::handleClient).listen(3000);
}private void handleClient(NetSocket socket) {RecordParser.newDelimited("\n", socket)                ❶ 通过查找新行进行解析.handler(buffer -> handleBuffer(socket, buffer))     ❷ 现在缓冲的都是一行.endHandler(v -> logger.info("Connection ended"));
}

Listing 4.19 A recordparser based on newlines over a TCP server stream

解析器既是读流又是写流,因为它充当两个流之间的适配器。它接收来自TCP套接字的中间缓冲,并将解析后的数据作为新缓冲发出。这是相当透明的,并简化了verticle实现的其余部分。

在下一个清单中,每个缓冲区都是一行,所以我们可以直接处理命令。。

private void handleBuffer(NetSocket socket, Buffer buffer) {String command = buffer.toString();                         ❶ 使用默认字符集进行缓冲区到字符串的解码switch (command) {case "/list":listCommand(socket);break;case "/play":vertx.eventBus().send("jukebox.play", "");break;case "/pause":vertx.eventBus().send("jukebox.pause", "");break;default:if (command.startsWith("/schedule ")) {schedule(command);} else {socket.write("Unknown command\n");}}
}

Listing 4.20 Handling parsed buffers

简单的命令在case子句中,其他命令在单独的方法中,如下面清单所示:

private void schedule(String command) {String track = command.substring(10);                         ❶ 前10个字符是 /schedule和一个空格JsonObject json = new JsonObject().put("file", track);vertx.eventBus().send("jukebox.schedule", json);
}private void listCommand(NetSocket socket) {vertx.eventBus().request("jukebox.list", "", reply -> {if (reply.succeeded()) {JsonObject data = (JsonObject) reply.result().body();data.getJsonArray("files").stream().forEach(name -> socket.write(name + "\n"));   ❷ 们将每个文件名写入标准控制台输出} else {logger.error("/list error", reply.cause());}});
}

Listing 4.21 Other commands

4.5 Parsing complex streams

流可能比文本行更复杂,RecordParser也可以简化我们的工作。让我们以键/值数据库存储为例,其中每个键和值都是一个字符串。

在这样的数据库中,我们可以有1 -> {foo} 和 2 -> {bar, baz}这样的条目,其中 1 和 2 是键。有无数种方法可以定义这种类型的数据结构的序列化方案,所以假设我们现在必须使用表4.6中的流格式。

Data Description
Magic header 1、2、3和4这样的字节序列,用于标识文件类型
Version 数据库流格式版本号
Name 以字符串形式表示的数据库名称,以换行符结束
Key length 键的字符长度
Key name 用于键的字符序列
Value length 值的字符长度
Value 用于值的字符序列
(…) 剩余的{key, value}序列

Table 4.6 Database stream format

这种格式混合了二进制记录和文本记录,因为流以一个魔数、一个版本号、一个名称开始,然后是键和值的序列。虽然格式本身在某些方面存在问题,但它是一个很好的例子,可以说明更复杂的解析。

首先,让我们有一个程序,它将数据库写入到一个具有两个键/值项的文件。下面的代码清单展示了如何使用Vert.x文件系统API打开文件,向缓冲区追加数据,最终将数据写入文件。

AsyncFile file = vertx.fileSystem().openBlocking("sample.db",new OpenOptions().setWrite(true).setCreate(true));
Buffer buffer = Buffer.buffer();buffer.appendBytes(new byte[] { 1, 2, 3, 4});   ❶ Magic number
buffer.appendInt(2);                            ❷ Version
buffer.appendString("Sample database\n");       ❸ Database nameString key = "abc";                             ❹ First entry
String value = "123456-abcdef";
buffer.appendInt(key.length()).appendString(key).appendInt(value.length()).appendString(value);key = "foo@bar";                                ❺ Second entry
value = "Foo Bar Baz";
buffer.appendInt(key.length()).appendString(key).appendInt(value.length()).appendString(value);file.end(buffer, ar -> vertx.close());

Listing 4.22 Writing a sample database to a file

在这个例子中,我们的数据很少,所以我们使用了一个在写入文件之前就已完全准备好的缓冲,但是我们同样可以为头【header】使用一个缓冲,为每个键/值条目【key/value entry】使用新的缓冲。

写很容易,但是读出来又如何呢?RecordParser有趣的特性是它的解析模式可以动态切换。我们可以一开始解析固定大小为5的缓冲区,然后切换到基于制表符的解析,然后是12字节的块,以此类推。

一个更好地表达解析逻辑的方式是将其分解为方法,每个方法对应于一个解析状态:一个解析数据库名称(Name)的方法,一个解析值项的方法,等等。

下面的代码清单中打开了我们之前编写的文件,并将RecordParser对象置于固定模式,因为我们正在寻找一个四个字节的序列,该序列表示魔数。所以当读取魔数时,将调用我们安装的处理程序。

AsyncFile file = vertx.fileSystem().openBlocking("sample.db",new OpenOptions().setRead(true));RecordParser parser = RecordParser.newFixed(4, file);           ❶ 我们首先要读出这个魔数。
parser.handler(header -> readMagicNumber(header, parser));

Listing 4.23 Reading a database stream, step 1

下一个代码清单提供了其他方法的实现。

private static void readMagicNumber(Buffer header, RecordParser parser) {logger.info("Magic number: {}:{}:{}:{}", header.getByte(0), ➥ header.getByte(1), header.getByte(2), header.getByte(3));parser.handler(version -> readVersion(version, parser));
}private static void readVersion(Buffer header, RecordParser parser) {logger.info("Version: {}", header.getInt(0));parser.delimitedMode("\n");                            ❶ 解析器模式可以动态切换parser.handler(name -> readName(name, parser));
}private static void readName(Buffer name, RecordParser parser) {logger.info("Name: {}", name.toString());parser.fixedSizeMode(4);parser.handler(keyLength -> readKey(keyLength, parser));
}

Listing 4.24 Reading a database stream, step 2

readMagicNumber方法从缓冲中提取魔数的四个字节。我们知道缓冲正好是4个字节,因为解析器处于固定大小模式。

下一个提取的条目是数据库版本,它是一个整数,因此我们不必更改解析器模式,因为整数也是4个字节。提取完数据库版本后,readName方法切换到分隔符模式来提取数据库名称。

下面的代码清单用于提取键名、值长度和适当的值,finishEntry将解析器设置为查找整数并委托给readKey方法。

private static void readKey(Buffer keyLength, RecordParser parser) {parser.fixedSizeMode(keyLength.getInt(0));parser.handler(key -> readValue(key.toString(), parser));
}private static void readValue(String key, RecordParser parser) {parser.fixedSizeMode(4);parser.handler(valueLength -> finishEntry(key, valueLength, parser));
}private static void finishEntry(String key, Buffer valueLength,
➥ RecordParser parser) {parser.fixedSizeMode(valueLength.getInt(0));parser.handler(value -> {logger.info("Key: {} / Value: {}", key, value);parser.fixedSizeMode(4);parser.handler(keyLength -> readKey(keyLength, parser));});
}

Listing 4.25 Reading a database stream, step 3

下面的清单显示了使用清单4.23到4.25的解析方法读取数据库文件时的一些日志输出:

DatabaseReader - Magic number: 1:2:3:4
DatabaseReader - Version: 2
DatabaseReader - Name: Sample database
DatabaseReader - Key: abc / Value: 123456-abcdef
DatabaseReader - Key: foo@bar / Value: Foo Bar Baz

Listing 4.26 Logs of reading the database stream

这些动态解析器模式和处理程序更改形成了一种非常简单但有效的方法来解析复杂流。

TIP 您可能想知道,在解析器已经从读流中获得了一些进一步的数据的情况下,解析模式如何能动态更改。请记住,我们处于一个事件循环中,因此解析器处理程序【parser handlers】一次处理一个解析器记录【parser records】。当我们从分隔符模式切换到固定大小模式时,下一条记录是通过基于字节数处理剩余的流数据,而不是基于查找字符串来发出的。当从固定大小模式切换到分隔符模式时,同样的推理也适用。

4.6 A quick note on the stream fetch mode

在我们结束这一章之前,让我们回到我故意忽略的ReadStream接口的细节。

从Vert.x3.6版本开始,引入了fetch模式,它允许流的消费者请求多个数据项,而不是流将数据项推送到消费者。这是通过暂停流,然后在需要数据时请求获取不同数量的项来实现的。

我们可以使用fetch模式重写点唱机文件流代码,但我们仍然需要一个计时器来指示速度。在本例中,手动读取4096字节的缓冲区或请求获取4096字节的缓冲区并没有什么不同。

相反,让我们回到数据库读取示例。清单4.23到4.25中均是基于读取流推送事件。切换到fetch模式并提取数据并不需要做太多更改。下面的代码清单展示了流初始化代码。

RecordParser parser = RecordParser.newFixed(4, file);
parser.pause();                                           ❶ 流不会推送事件。
parser.fetch(1);                                          ❷ 我们请求fetch一个元素(这里指的是一个缓冲)
parser.handler(header -> readMagicNumber(header, parser));

Listing 4.27 Putting a read stream in fetch mode

记住,RecordParser装饰了文件流。它被暂停,然后使用fetch方法请求一个元素。由于解析器会发出已解析数据的缓冲,因此在本例中请求一个元素意味着请求一个4字节(魔数)的缓冲。最终,将调用解析器处理程序【parser handler】来处理请求的缓冲区,在对fetch方法进行另一次调用之前,不会发生其他任何事情。

下面的代码清单展示了两个解析处理程序方法以及它们对fetch模式的适配:

private static void readMagicNumber(Buffer header, RecordParser parser) {logger.info("Magic number: {}:{}:{}:{}", header.getByte(0), ➥ header.getByte(1), header.getByte(2), header.getByte(3));parser.handler(version -> readVersion(version, parser));parser.fetch(1);                                           ❶ 这里的一项对应一个解析器记录
}
// (...)private static void finishEntry(String key, Buffer valueLength,
➥ RecordParser parser) {parser.fixedSizeMode(valueLength.getInt(0));parser.handler(value -> {logger.info("Key: {} / Value: {}", key, value);parser.fixedSizeMode(4);parser.handler(keyLength -> readKey(keyLength, parser));parser.fetch(1);});parser.fetch(1);
}

Listing 4.28 Fetching stream data as needed

这两种模式之间的唯一区别是,我们需要通过调用fetch来请求元素。在编写Vert.x应用程序时,您可能不需要使用fetch模式,但是如果您需要手动控制读流,那么它是一个非常有用的工具。

在许多情况下,您只需要推送数据即可,请求者可以在需要暂停时通过发出信号来管理背压。如果请求者更容易让发生器【source】知道它可以处理多少个条目,那么fetch数据是管理背压的更好选择。Vert.x streams 在这里非常灵活。

下一章将重点介绍除了使用Vert.x进行异步编程的回调之外的其他模型。

Summary

  1. Vert.x stream为异步事件流和数据流建模,它可以以push或fetch模式使用
  2. 背压管理对于确保异步系统之间事件的协调交换至关重要,我们通过音乐点唱机实例说明了这一点
  3. 流可以被解析为简单和复杂的数据,我们通过一个音频流媒体服务的网络控制接口来说明了这一点

Vert.x实战 异步数据和事件流相关推荐

  1. java 连接oracle_「事件驱动架构」使用GoldenGate创建从Oracle到Kafka的CDC事件流

    我们通过GoldenGate技术在Oracle DB和Kafka代理之间创建集成,该技术实时发布Kafka中的CDC事件流. Oracle在其Oracle GoldenGate for Big Dat ...

  2. 「事件驱动架构」使用GoldenGate创建从Oracle到Kafka的CDC事件流

    我们通过GoldenGate技术在Oracle DB和Kafka代理之间创建集成,该技术实时发布Kafka中的CDC事件流. Oracle在其Oracle GoldenGate for Big Dat ...

  3. oracle一列中间加一个字_「首席看架构」用GoldenGate创建从Oracle到Kafka的CDC事件流...

    我们通过GoldenGate技术在Oracle DB和Kafka代理之间创建集成,该技术实时发布Kafka中的CDC事件流. Oracle在其Oracle GoldenGate for Big Dat ...

  4. 设计数据密集型应用——流处理(11)

    文章目录 1. 传递事件流 1.1 消息传递系统 1.1.1 直接从生产者传递给消费者 1.1.2 消息代理 1.1.3 消息代理与数据库对比 1.1.4 多个消费者 1.1.5 确认与重现传递 1. ...

  5. Vert.x实战 事件总线:Vert.x应用程序的主干

    本章包括: 什么是事件总线 如何在事件总线上拥有点对点通信[point-to-point].请求-应答通信[request-reply].发布/订阅通信[publish/subscribe] 用于在网 ...

  6. Spark项目实战:大数据实时流处理日志(非常详细)

    实战概览 一.实战内容 二.大数据实时流处理分析系统简介 1.需求 2.背景及架构 三.实战所用到的架构和涉及的知识 1.后端架构 2.前端框架 四.项目实战 1.后端开发实战 1.构建项目 2.引入 ...

  7. 后处理程序文件大小的变量_【每日一题】(17题)面试官问:JS中事件流,事件处理程序,事件对象的理解?...

    关注「松宝写代码」,精选好文,每日一题 作者:saucxs | songEagle 2020,实「鼠」不易 2021,「牛」转乾坤 风劲潮涌当扬帆,任重道远须奋蹄! 一.前言 2020.12.23 立 ...

  8. [译] 基于事件流构建的服务

    [译] 基于事件流构建的服务 摘要:本文属于原创,欢迎转载,转载请保留出处:https://github.com/jasonGeng88/blog 原文:https://www.confluent.i ...

  9. 什么是大数据「实时流计算」?深度解析它的4大应用及4个特点

    导读:火灾已经爆发后才知道救火,交通已经阻塞后才知道疏通,羊毛已经被"羊毛党"薅光后才知道堵上漏洞,股价已经拉升后才知道后悔--为什么我们不能在这些事情发生之前,或者至少是刚刚发生 ...

最新文章

  1. 添加Net4CollectionTypeFactory的原因
  2. agc015F Kenus the Ancient Greek
  3. iOS程序的启动过程介绍
  4. QT的QHoverEvent类的使用
  5. 爬取前尘无忧python职位信息并保存到mongo数据库
  6. RabbitMQ(1) - win+rabbitMQ
  7. 用TortoiseGit时的实用git命令
  8. 大数据分析必须要会的统计分析!!!
  9. 动态设置全屏、取消全屏的方法,以及切换全屏保持内容位置不变的方法
  10. ubuntu安装visio2010_ubuntu 一个Linux脚本搞定常用软件的安装
  11. 03-JVM内存分配机制详解
  12. 让你的Windows系统时光倒流
  13. 关于字长、内存空间、地址总线、数据总线的理解
  14. Java核心技术整理(九)---持久层、业务层、表现层
  15. 黑白方格画C++解答
  16. ❤️React Hooks⭐
  17. Poi 自定义封装方法 合并excel中的单元格
  18. 面试时遇到『看门狗』脖子上挂着『时间轮』,我就问你怕不怕?
  19. 健康跑@长沙城(上)
  20. 弃掉Android 4.4获取系统图片出错之坑,实现 自定义相册库

热门文章

  1. linux 检测u盘 好坏,u盘质量检测最佳方法,u盘质量好坏检测方法
  2. python做excel数据分析带gui_Python进阶量化交易专栏场外篇25-GUI工具实现excel功能...
  3. 1123: [POI2008]BLO
  4. 视频去水印、文案提取和智能配音,视频搬运合成速成教程,超简单
  5. 电脑快捷键全都在这了!电脑技巧收藏!
  6. 一文openpose姿态估计
  7. 【JDK配置】雀氏纸尿裤,天才第一步
  8. 【CVE-2021-4034】 漏洞详细原理以及复现,polkit的pkexec中的本地提权漏洞
  9. postfix 邮箱设置及常见错误
  10. 夜神模拟器——vscode调试模拟器找不到模拟器