本页阐述了使用Flink的API来进行外部数据存储的异步I/O,对于不熟悉异步或者事件驱动编程的用户,一篇关于Future和事件驱动编程可能会很有用。

注意:关于异步I/O的详细设计和实现可以在异步I/O设计和实现这篇文章找到。

异步I/O操作的需要

当与外部系统进行交互(例如使用存储在数据库中的数据丰富流事件)时, 需要注意的是, 与外部系统的通信延迟并不决定流应用程序的总体工作。

原始的访问外部系统中的数据,例如通过一个MapFunction来访问,通常意味着同步交互:将一个请求发送到数据库,MapFunction等待直到接收到响应为止。很多情况下,这种等待会占用很大一部分函数的时间。

与外部数据库系统进行异步交互意味着一个并行函数实例可以并发地处理多个请求和并发地接收多个响应。那样的话,等待时间就可以被其他的请求或者响应所覆盖。至少,等待时间可以被多个请求摊销,这在很多情况下会导致更高的流吞吐量。

注意:通过扩展MapFunction到一个很高的并发度来提高吞吐量在一定程度上是可行的,但是常常会导致很高的资源消耗:有很多的并行MapFunction实例意味着更多的任务、线程、Flink内部网络连接、与数据库之间的网络连接、缓存以及通常的内部开销。

前提

如上节所述,实现一个连接数据库(或者key/value存储系统)的正确异步I/O需要一个客户端,数据库支持通过该客户端来进行异步请求。许多流行的数据库都支持这种客户端。

对于没有这种客户端的情况下,用户可以将异步客户端换成一个可以通过创建多个客户端并使用线程池处理同步调用来尝试将同步客户端转换为有限的并发客户端。然而,这个方法通常比纯粹的异步客户端性能要低一些。

异步I/O API

Flink的Async I/O允许用户在数据流中使用异步的请求客户端,这个API会处理与数据流的交互,同时还处理顺序、事件时间、容错等。

假设已经目标数据库已经有了异步客户端,要实现一个通过异步I/O来操作数据库还需要三个步骤:

1、实现用来分发请求的AsyncFunction

2、获取操作结果的callback,并将它提交到AsyncCollector中

3、将异步I/O操作作为转换操作应用于DataStream中

下面代码展示了基本的模式:

// This example implements the asynchronous request and callback with Futures that have the

// interface of Java 8's futures (which is the same one followed by Flink's Future)

/**

* An implementation of the 'AsyncFunction' that sends requests and sets the callback.

*/

class AsyncDatabaseRequest extends RichAsyncFunction> {

/** The database specific client that can issue concurrent requests with callbacks */

private transient DatabaseClient client;

@Override

public void open(Configuration parameters) throws Exception {

client = new DatabaseClient(host, post, credentials);

}

@Override

public void close() throws Exception {

client.close();

}

@Override

public void asyncInvoke(final String str, final AsyncCollector> asyncCollector) throws Exception {

// issue the asynchronous request, receive a future for result

Future resultFuture = client.query(str);

// set the callback to be executed once the request by the client is complete

// the callback simply forwards the result to the collector

resultFuture.thenAccept( (String result) -> {

asyncCollector.collect(Collections.singleton(new Tuple2<>(str, result)));

});

}

}

// create the original stream

DataStream stream = ...;

// apply the async I/O transformation

DataStream> resultStream =

AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);

/**

* An implementation of the 'AsyncFunction' that sends requests and sets the callback.

*/

class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {

/** The database specific client that can issue concurrent requests with callbacks */

lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials)

/** The context used for the future callbacks */

implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())

override def asyncInvoke(str: String, asyncCollector: AsyncCollector[(String, String)]): Unit = {

// issue the asynchronous request, receive a future for the result

val resultFuture: Future[String] = client.query(str)

// set the callback to be executed once the request by the client is complete

// the callback simply forwards the result to the collector

resultFuture.onSuccess {

case result: String => asyncCollector.collect(Iterable((str, result)));

}

}

}

// create the original stream

val stream: DataStream[String] = ...

// apply the async I/O transformation

val resultStream: DataStream[(String, String)] =

AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100)

重要提醒:AsyncCollector在第一次调用AsyncCollector.collect时就完成了,所有后续的collect调用都会被忽略。

下面的两个参数控制了异步操作:

****Timeout****:timeout定义了异步操作过了多长时间后会被丢弃,这个参数是防止了死的或者失败的请求

****Capacity****:这个参数定义了可以同时处理多少个异步请求,虽然异步I/O方法会带来更好的吞吐量,但是算子任然会成为流应用的瓶颈。限制并发请求的数量确保了算子不会积累不断增加的积压的待处理请求,但一旦容量耗尽,它将触发背压。

结果顺序

由AsyncFunction发出的并发请求经常是以无序的形式完成,取决于哪个请求先完成。为了控制发出请求结果的顺序,Flink提供了两种模式:

****Unordered****:结果记录在异步请求完成后就发出,流中的记录的顺序通过异步I/O操作后会与先前的不一致。当使用处理时间作为时间特性时这种模式具有低延迟、低消耗特点。通过AsyncDataStream.unorderedWait(...)来使用这种模式。

****Ordered****:在这种情况下,流的顺序是保留的,结果记录发出的顺利与异步请求触发的顺序(算子输入记录的顺序)一致。为了实现这一点,算子会将结果记录缓存起来直到所有的处理记录都被发出(或者超时)为止。这常常会导致一定程度的延迟和checkpoint消耗,因为跟非排序模式相比,记录或者结果会被长时间保存在checkpoint State中。通过AsyncDataStream.orderedWait(...)来使用这种模式。

事件时间

当使用流程序使用事件时间时,异步I/O操作将正确处理水印,这具体说明了如下两种模式:

****Unordered****:水印不会超过记录反之亦然,这也就意味着水印建立起了一个秩序边界。记录在两个水印间无序地发出。在一个水印后产生的记录只能在这个水印发出之后才能发出,同样水印也只能在所有水印之前的记录都发出之后才能发出。

****Ordered****:保存水印的顺序,就如保存记录之间的顺序一样。与处理时间相比,开销没有显著变化。

请记住,摄入时间是一个特殊的事件时间,会基于源处理时间的自动产生水印。

容错性保证

异步I/O操作提供了exactly-once容错性保证,它将异步请求的记录存储在checkpoint中,并在从故障中恢复时恢复/重新触发请求。

实施提示

警告

flink 异步io mysql 缓存_Flink用于外部数据访问的异步I/O相关推荐

  1. 业务异步写mysql数据库_把重要的业务日志异步批量写入数据库

    1. 把重要的业务日志异步批量写入数据库 配置文件示例: log4j.logger.business=INFO,db log4j.appender.db=org.apache.log4j.jdbc.J ...

  2. AIR-Android开发外部数据访问与存储

    http://blog.chinaunix.net/uid-10062010-id-3174877.html AIR程序大多需要在本地缓存一些数据,这些数据是否能正确存储关系到AIR是否真正可用,本文 ...

  3. sqlsugar mysql连接字符串_通用数据访问组件UniDAC最新版本v8.2.4,支持Lazarus中的macOS 64位...

    UniDAC(Universal Data Access Components)是一款通用数据库访问组件,提供了多个数据库的直接访问,如针对Windows的Delphi, C++Builder, La ...

  4. 实战:Flink1.12异步IO访问外部数据-Mysql

    微信公众号:大数据开发运维架构 关注可了解更多大数据相关的资讯.问题或建议,请公众号留言; 如果您觉得"大数据开发运维架构"对你有帮助,欢迎转发朋友圈 从微信公众号拷贝过来,格式有 ...

  5. Reactor模型和Proactor模型:同步IO与异步IO

    Table of Contents 服务端的线程模型 2种fd 3种事件 Reactor模型-同步I/O 1.单Reactor单线程模型 2.单Reactor多线程模型 3.主从Reactor多线程模 ...

  6. python 异步io_Python中的异步IO:完整的演练

    python 异步io Async IO is a concurrent programming design that has received dedicated support in Pytho ...

  7. 高性能异步IO机制:IO_URING

    高性能异步IO机制:IO_URING 一.前言 1.1 异步IO机制 Linux内核提供的IO机制大都是同步实现的,如常规的read/write/send/recv等系统调用.同步IO机制存在着一定的 ...

  8. Node的异步与java的异步_node中异步IO的理解

    解释性语言和编译型语言的区别: 计算器不能直接的理解高级语言,只能理解机器语言,所以必须把高级语言翻译为机器语言,翻译的方式有两种,一个是编译,一个是解释. 解释性语言的程序不需要编译,它是在运行程序 ...

  9. node中异步IO的理解

    解释性语言和编译型语言的区别: 计算器不能直接的理解高级语言,只能理解机器语言,所以必须把高级语言翻译为机器语言,翻译的方式有两种,一个是编译,一个是解释. 解释性语言的程序不需要编译,它是在运行程序 ...

最新文章

  1. gcc8之前,coredump文件无法显示正确的函数调用栈信息
  2. struts2 中文乱码问题
  3. 学而思的python课怎么样_有在用学而思网校的同学觉得孙墨漪老师怎么样?报她的课值得吗?...
  4. 【多线程基础】- 多个线程顺序打印ABC
  5. 1+X web中级 Laravel学习笔记——视图和模型
  6. eplan单线原理图多线原理图_EPLAN-黑盒-2
  7. Java匿名内部类里为什么能用外部变量
  8. MATLAB数据标准化处理,mapminmax、zscore、mapstd对比
  9. Matlab简单描点绘图
  10. Pandas中DataFrame数据的常用操作(创建、转置、查询、排序、缺失、运算、合并、追加、修改、分组、压缩等)
  11. [转]PKM-个人知识体系建设
  12. Multisim14仿真使用汇总
  13. 计算机培训日志范文30篇,班主任工作日志20篇.docx
  14. linux中文输入配置sougou输入法
  15. 图像修复序列——BSCB模型
  16. MTFCSGO准心设置
  17. Unity 最近经验分享
  18. 单片机蓝牙烧录_怎么样蓝牙模块给单片机烧程序?
  19. linux查看cpupower模式,cpupower命令 – 调整CPU主频
  20. ZEGO即构科技荣获36氪【WISE2020中国新经济之王最具影响力企业】

热门文章

  1. python 构造函数传参_C++和python混合编程之在python类构造函数中传参方法
  2. python从键盘输入一个数n、输出大于n且不能整除3_python基础练习题
  3. HDU1517 A Multiplication Game (博弈论+思维)
  4. java 绘制sin函数图像_第11讲 数学软件Mathematica内置函数的使用规则
  5. Tomcat和eclipse的整合
  6. R语言︱决策树族——随机森林算法
  7. R语言自动化报告格式——knitr
  8. jsp中文乱码问题 个人感觉比较有用
  9. Ansible入门使用
  10. VS2010 IDE安装问题