Flink 异步I/O

1.概述

在与外部系统交互(用数据库中的数据扩充流数据)的时候,需要考虑与外部系统的通信延迟对整个流处理应用的影响。

简单地访问外部数据库的数据,比如使用 MapFunction,通常意味着同步交互: MapFunction 向数据库发送一个请求然后一直等待,直到收到响应。在许多情况下,等待占据了函数运行的大部分时间。

与数据库异步交互是指一个并行函数实例可以并发地处理多个请求和接收多个响应。这样,函数在等待的时间可以发送其他请求和接收其他响应。至少等待的时间可以被多个请求摊分。大多数情况下,异步交互可以大幅度提高流处理的吞吐量。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wMw9rtN2-1651062884827)(assets/async_io.svg)]

*注意:*仅仅提高 MapFunction 的并行度(parallelism)在有些情况下也可以提升吞吐量,但是这样做通常会导致非常高的资源消耗:更多的并行 MapFunction 实例意味着更多的 Task、更多的线程、更多的 Flink 内部网络连接、 更多的与数据库的网络连接、更多的缓冲和更多程序内部协调的开销。

2.异步IO API

Flink 的异步 I/O API 允许用户在流处理中使用异步请求客户端。API 处理与数据流的集成,同时还能处理好顺序、事件时间和容错等。

在具备异步数据库客户端的基础上,实现数据流转换操作与数据库的异步 I/O 交互需要以下三部分:

  • 实现分发请求的 AsyncFunction
  • 获取数据库交互的结果并发送给 ResultFuture回调 函数
  • 将异步 I/O 操作应用于 DataStream 作为 DataStream 的一次转换操作。

下面是基本的代码模板:

// 这个例子使用 Java 8 的 Future 接口(与 Flink 的 Future 相同)实现了异步请求和回调。/*** 实现 'AsyncFunction' 用于发送请求和设置回调。*/
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {/** 能够利用回调函数并发发送请求的数据库客户端 */private transient DatabaseClient client;@Overridepublic void open(Configuration parameters) throws Exception {client = new DatabaseClient(host, post, credentials);}@Overridepublic void close() throws Exception {client.close();}@Overridepublic void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {// 发送异步请求,接收 future 结果final Future<String> result = client.query(key);// 设置客户端完成请求后要执行的回调函数// 回调函数只是简单地把结果发给 futureCompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {try {return result.get();} catch (InterruptedException | ExecutionException e) {// 显示地处理异常。return null;}}}).thenAccept( (String dbResult) -> {resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));});}
}// 创建初始 DataStream
DataStream<String> stream = ...;// 应用异步 I/O 转换操作
DataStream<Tuple2<String, String>> resultStream =AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);

重要提示: 第一次调用 ResultFuture.completeResultFuture 就完成了。 后续的 complete 调用都将被忽略。

下面两个参数控制异步操作:

  • Timeout: 超时参数定义了异步请求发出多久后未得到响应即被认定为失败。 它可以防止一直等待得不到响应的请求。
  • Capacity: 容量参数定义了可以同时进行的异步请求数。 即使异步 I/O 通常带来更高的吞吐量,执行异步 I/O 操作的算子仍然可能成为流处理的瓶颈。 限制并发请求的数量可以确保算子不会持续累积待处理的请求进而造成积压,而是在容量耗尽时触发反压。

3.超时处理

当异步 I/O 请求超时的时候,默认会抛出异常并重启作业。 如果你想处理超时,可以重写 AsyncFunction#timeout 方法。

4.结果的顺序

AsyncFunction 发出的并发请求经常以不确定的顺序完成,这取决于请求得到响应的顺序。 Flink 提供两种模式控制结果记录以何种顺序发出。

  • 无序模式: 异步请求一结束就立刻发出结果记录。 流中记录的顺序在经过异步 I/O 算子之后发生了改变。 当使用 处理时间 作为基本时间特征时,这个模式具有最低的延迟和最少的开销。 此模式使用 AsyncDataStream.unorderedWait(...) 方法。
  • 有序模式: 这种模式保持了流的顺序。发出结果记录的顺序与触发异步请求的顺序(记录输入算子的顺序)相同。为了实现这一点,算子将缓冲一个结果记录直到这条记录前面的所有记录都发出(或超时)。由于记录或者结果要在 checkpoint 的状态中保存更长的时间,所以与无序模式相比,有序模式通常会带来一些额外的延迟和 checkpoint 开销。此模式使用 AsyncDataStream.orderedWait(...) 方法。

5.事件时间

当流处理应用使用事件时间时,异步 I/O 算子会正确处理 watermark。对于两种顺序模式,这意味着以下内容:

  • 无序模式: Watermark 既不超前于记录也不落后于记录,即 watermark 建立了顺序的边界。 只有连续两个 watermark 之间的记录是无序发出的。 在一个 watermark 后面生成的记录只会在这个 watermark 发出以后才发出。 在一个 watermark 之前的所有输入的结果记录全部发出以后,才会发出这个 watermark。

    这意味着存在 watermark 的情况下,无序模式 会引入一些与有序模式 相同的延迟和管理开销。开销大小取决于 watermark 的频率。

  • 有序模式: 连续两个 watermark 之间的记录顺序也被保留了。开销与使用处理时间 相比,没有显著的差别。

请记住,摄入时间 是一种特殊的事件时间,它基于数据源的处理时间自动生成 watermark。

6.容错保证

异步 I/O 算子提供了完全的精确一次容错保证。它将在途的异步请求的记录保存在 checkpoint 中,在故障恢复时重新触发请求。

7.实现提示

在实现使用 Executor(或者 Scala 中的 ExecutionContext)和回调的 Futures 时,建议使用 DirectExecutor,因为通常回调的工作量很小,DirectExecutor 避免了额外的线程切换开销。回调通常只是把结果发送给 ResultFuture,也就是把它添加进输出缓冲。从这里开始,包括发送记录和与 chenkpoint 交互在内的繁重逻辑都将在专有的线程池中进行处理。

DirectExecutor 可以通过 org.apache.flink.runtime.concurrent.Executors.directExecutor()com.google.common.util.concurrent.MoreExecutors.directExecutor() 获得。

8.警告

Flink 不以多线程方式调用 AsyncFunction

我们想在这里明确指出一个经常混淆的地方:AsyncFunction 不是以多线程方式调用的。 只有一个 AsyncFunction 实例,它被流中相应分区内的每个记录顺序地调用。除非 asyncInvoke(...) 方法快速返回并且依赖于(客户端的)回调, 否则无法实现正确的异步 I/O。

例如,以下情况导致阻塞的 asyncInvoke(...) 函数,从而使异步行为无效:

  • 使用同步数据库客户端,它的查询方法调用在返回结果前一直被阻塞。
  • asyncInvoke(...) 方法内阻塞等待异步客户端返回的 future 类型对象

目前,出于一致性的原因,AsyncFunction 的算子(异步等待算子)必须位于算子链的头部

户端,它的查询方法调用在返回结果前一直被阻塞。

  • asyncInvoke(...) 方法内阻塞等待异步客户端返回的 future 类型对象

目前,出于一致性的原因,AsyncFunction 的算子(异步等待算子)必须位于算子链的头部

根据 FLINK-13063 给出的原因,目前我们必须断开异步等待算子的算子链以防止潜在的一致性问题。这改变了先前支持的算子链的行为。需要旧有行为并接受可能违反一致性保证的用户可以实例化并手工将异步等待算子添加到作业图中并将链策略设置回通过异步等待算子的 ChainingStrategy.ALWAYS 方法进行链接。

Flink 异步IO相关推荐

  1. flink 异步io使用

    [4]中提到了使用异步IO的场景 需要异步I / O操作 当与外部系统交互时(例如,当使用存储在数据库中的数据来丰富流事件时),需要注意与外部系统的通信延迟不会影响流应用程序的整体工作. 直接访问外部 ...

  2. Flink 异步IO时 java.util.concurrent.TimeoutException: Async function call has timed out.

    Flink 异步IO时 timeout报错 java.lang.Exception: An async function call terminated with an exception. Fail ...

  3. Flink 异步IO访问外部数据(mysql篇)

    接上篇:[翻译]Flink 异步I / O访问外部数据 最近看了大佬的博客,突然想起Async I/O方式是Blink 推给社区的一大重要功能,可以使用异步的方式获取外部数据,想着自己实现以下,项目上 ...

  4. flink 异步io mysql 缓存_Flink用于外部数据访问的异步I/O

    本页阐述了使用Flink的API来进行外部数据存储的异步I/O,对于不熟悉异步或者事件驱动编程的用户,一篇关于Future和事件驱动编程可能会很有用. 注意:关于异步I/O的详细设计和实现可以在异步I ...

  5. Flink异步io应用场景之流表join维表

    简介 维度表,作为数据仓库里面的概念,是维度属性的集合,比如时间维.地点维:可以是一个mysql或者cassandra,redis等存储,甚至是自己定义的一些api. 流表是kafka等流式数据. 根 ...

  6. 2021年大数据Flink(四十六):扩展阅读 异步IO

    目录 扩展阅读  异步IO 介绍 异步IO操作的需求 使用Aysnc I/O的前提条件 Async I/O API 案例演示 扩展阅读 原理深入 AsyncDataStream 消息的顺序性 扩展阅读 ...

  7. Flink(54):Flink高级特性之异步IO(Async I/O)

    目录 0. 相关文章链接 1. 异步IO概述 1.1. 异步IO操作的需求 1.2. 使用Aysnc I/O的前提条件 1.3. Async I/O API 2. 案例展示 2.1. 需求 2.2. ...

  8. 实战:Flink1.12异步IO访问外部数据-Mysql

    微信公众号:大数据开发运维架构 关注可了解更多大数据相关的资讯.问题或建议,请公众号留言; 如果您觉得"大数据开发运维架构"对你有帮助,欢迎转发朋友圈 从微信公众号拷贝过来,格式有 ...

  9. flink批处理访问mysql_Flink 异步IO访问外部数据(mysql篇)

    最近看了大佬的博客,突然想起Async I/O方式是Blink 推给社区的一大重要功能,可以使用异步的方式获取外部数据,想着自己实现以下,项目上用的时候,可以不用现去找了. 最开始想用scala 实现 ...

  10. Flink SQL中使用异步io关联维表

    目录 一.目标 二.方案 2.1. 自己实现demo: 2.2. 社区实现: 一.目标 希望使用flink sql来关联维度表,但是想用异步IO的方式关联. 二.方案 当前社区进展:目前Flink S ...

最新文章

  1. Science | 机器学习揭示了构建人造蛋白质的秘诀
  2. 深度学习贝叶斯,这是一份密集的6天速成课程(附视频与PPT)
  3. python内置数据结构教程_python课程第二周 内置数据结构——列表和元组
  4. 1.我和python的第一次亲密接触
  5. [机器学习] 推荐系统之协同过滤算法(转)
  6. linux中的man文档结构
  7. 开发实践丨用小熊派STM32开发板模拟自动售货机
  8. STM32F103:三.(1)步进电机
  9. 容器转换类型,列表,集合,字典推导式
  10. HTML示例06---段落(原格式标记)
  11. 猎豹MFC--图片控件CStatic和动画控件CAnimateCtrl
  12. Planetside.Software.Terragen.v0.9.43.WinALL 1CD(景观产生器)
  13. activiti6教程四
  14. 通过网线实现两台PC相互通信(并实现访问公网)
  15. group by column中的column与前面条件发生的错误的解决方案
  16. 转载:做人开心最重要
  17. 电子元器件封装获取方法
  18. 虚拟串口工具VSPD的使用
  19. 销售不愿意用企业微信怎么办?
  20. 重启计算机按哪几个键,电脑快捷重启按什么键

热门文章

  1. 2G到5G系统的横向比较(1)多址方式与调制方式
  2. 地图采集商家,附近商家,最新企业信息采集软件的使用教程
  3. 网页自动关机代码HTML,电脑怎么设置自动关机时间
  4. 带圈圈的数字1~50,求50以上,不要word的
  5. Android 反编译三种方式
  6. Windows远程桌面单/多用户同时登录
  7. cmos和ttl_ttl和cmos的区别
  8. oracle plsql破解
  9. android imageview 获取bitmap缩放大小,android – Imageview缩放方法“centercrop”作为代码...
  10. awvs无法启动问题