最近看了大佬的博客,突然想起Async I/O方式是Blink 推给社区的一大重要功能,可以使用异步的方式获取外部数据,想着自己实现以下,项目上用的时候,可以不用现去找了。

最开始想用scala 实现一个读取 hbase数据的demo,参照官网demo:

/*** An implementation of the 'AsyncFunction' that sends requests and sets the callback.*/

class AsyncDatabaseRequest extendsAsyncFunction[String, (String, String)] {/**The database specific client that can issue concurrent requests with callbacks*/lazy val client: DatabaseClient= newDatabaseClient(host, post, credentials)/**The context used for the future callbacks*/implicit lazy val executor: ExecutionContext=ExecutionContext.fromExecutor(Executors.directExecutor())

override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit={//issue the asynchronous request, receive a future for the result

val resultFutureRequested: Future[String] = client.query(str)//set the callback to be executed once the request by the client is complete//the callback simply forwards the result to the result future

resultFutureRequested.onSuccess {case result: String =>resultFuture.complete(Iterable((str, result)))

}

}

}//create the original stream

val stream: DataStream[String] =...//apply the async I/O transformation

val resultStream: DataStream[(String, String)] =AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100)

失败了,上图标红的部分实现不了

1、Future 找不到可以用的实现类

2、unorderedWait 一直报错

源码example 里面也有Scala 的案例

def main(args: Array[String]) {

val timeout= 10000Lval env=StreamExecutionEnvironment.getExecutionEnvironment

val input= env.addSource(newSimpleSource())

val asyncMapped= AsyncDataStream.orderedWait(input, timeout, TimeUnit.MILLISECONDS, 10) {

(input, collector: ResultFuture[Int]) =>Future {

collector.complete(Seq(input))

} (ExecutionContext.global)

}

asyncMapped.print()

env.execute("Async I/O job")

}

主要部分是这样的,菜鸡表示无力,想继承RichAsyncFunction,可以使用open 方法初始化链接。

网上博客翻了不少,大部分是翻译官网的原理,案例也没有可以执行的,苦恼。

失败了。

转为java版本的,昨天在群里问,有个大佬给我个Java版本的: https://github.com/perkinls/flink-local-train/blob/c8b4efe33620352aea0100adef4fae2a068a3b65/src/main/scala/com/lp/test/asyncio/AsyncIoSideTableJoinMysqlJava.java 还没看过,因为Java版的官网的案例能看懂。

下面开始上mysql 版本 的 源码(hbase 的还没测试过,本机的hbase 挂了):

业务如下:

接收kafka数据,转为user对象,调用async,使用user.id 查询对应的phone,放回user对象,输出

主类:

importcom.alibaba.fastjson.JSON;importcom.venn.common.Common;importorg.apache.flink.formats.json.JsonNodeDeserializationSchema;importorg.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;importorg.apache.flink.streaming.api.datastream.AsyncDataStream;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;importjava.util.concurrent.TimeUnit;public classAsyncMysqlRequest {public static void main(String[] args) throwsException {final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();

FlinkKafkaConsumer source = new FlinkKafkaConsumer<>("async", newJsonNodeDeserializationSchema(), Common.getProp());//接收kafka数据,转为User 对象

DataStream input = env.addSource(source).map(value ->{

String id= value.get("id").asText();

String username= value.get("username").asText();

String password= value.get("password").asText();return newUser(id, username, password);

});//异步IO 获取mysql数据, timeout 时间 1s,容量 10(超过10个请求,会反压上游节点)

DataStream async = AsyncDataStream.unorderedWait(input, new AsyncFunctionForMysqlJava(), 1000, TimeUnit.MICROSECONDS, 10);

async.map(user->{returnJSON.toJSON(user).toString();

})

.print();

env.execute("asyncForMysql");

}

}

函数类:

importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.functions.async.ResultFuture;importorg.apache.flink.streaming.api.functions.async.RichAsyncFunction;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.util.ArrayList;importjava.util.Collections;importjava.util.List;import java.util.concurrent.*;public class AsyncFunctionForMysqlJava extends RichAsyncFunction{

Logger logger= LoggerFactory.getLogger(AsyncFunctionForMysqlJava.class);private transientMysqlClient client;private transientExecutorService executorService;/*** open 方法中初始化链接

*

*@paramparameters

*@throwsException*/@Overridepublic void open(Configuration parameters) throwsException {

logger.info("async function for mysql java open ...");super.open(parameters);

client= newMysqlClient();

executorService= Executors.newFixedThreadPool(30);

}/*** use asyncUser.getId async get asyncUser phone

*

*@paramasyncUser

*@paramresultFuture

*@throwsException*/@Overridepublic void asyncInvoke(AsyncUser asyncUser, ResultFuture resultFuture) throwsException {

executorService.submit(()->{//submit query

System.out.println("submit query : " + asyncUser.getId() + "-1-" +System.currentTimeMillis());

AsyncUser tmp=client.query1(asyncUser);//一定要记得放回 resultFuture,不然数据全部是timeout 的

resultFuture.complete(Collections.singletonList(tmp));

});

}

@Overridepublic void timeout(AsyncUser input, ResultFuture resultFuture) throwsException {

logger.warn("Async function for hbase timeout");

List list = newArrayList();

input.setPhone("timeout");

list.add(input);

resultFuture.complete(list);

}/*** close function

*

*@throwsException*/@Overridepublic void close() throwsException {

logger.info("async function for mysql java close ...");super.close();

}

}

MysqlClient:

importcom.venn.flink.util.MathUtil;importorg.apache.flink.shaded.netty4.io.netty.channel.DefaultEventLoop;importorg.apache.flink.shaded.netty4.io.netty.util.concurrent.Future;importorg.apache.flink.shaded.netty4.io.netty.util.concurrent.SucceededFuture;importjava.sql.DriverManager;importjava.sql.PreparedStatement;importjava.sql.ResultSet;importjava.sql.SQLException;public classMysqlClient {private static String jdbcUrl = "jdbc:mysql://192.168.229.128:3306?useSSL=false&allowPublicKeyRetrieval=true";private static String username = "root";private static String password = "123456";private static String driverName = "com.mysql.jdbc.Driver";private staticjava.sql.Connection conn;private staticPreparedStatement ps;static{try{

Class.forName(driverName);

conn=DriverManager.getConnection(jdbcUrl, username, password);

ps= conn.prepareStatement("select phone from async.async_test where id = ?");

}catch (ClassNotFoundException |SQLException e) {

e.printStackTrace();

}

}/*** execute query

*

*@paramuser

*@return

*/

publicAsyncUser query1(AsyncUser user) {try{

Thread.sleep(10);

}catch(InterruptedException e) {

e.printStackTrace();

}

String phone= "0000";try{

ps.setString(1, user.getId());

ResultSet rs=ps.executeQuery();if (!rs.isClosed() &&rs.next()) {

phone= rs.getString(1);

}

System.out.println("execute query : " + user.getId() + "-2-" + "phone : " + phone + "-" +System.currentTimeMillis());

}catch(SQLException e) {

e.printStackTrace();

}

user.setPhone(phone);returnuser;

}

// 测试代码public static voidmain(String[] args) {

MysqlClient mysqlClient= newMysqlClient();

AsyncUser asyncUser= newAsyncUser();

asyncUser.setId("526");long start =System.currentTimeMillis();

asyncUser=mysqlClient.query1(asyncUser);

System.out.println("end : " + (System.currentTimeMillis() -start));

System.out.println(asyncUser.toString());

}

}

函数类(错误示范:asyncInvoke 方法中阻塞查询数据库,是同步的):

importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.functions.async.ResultFuture;importorg.apache.flink.streaming.api.functions.async.RichAsyncFunction;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.sql.DriverManager;importjava.sql.PreparedStatement;importjava.sql.ResultSet;importjava.util.ArrayList;importjava.util.List;public class AsyncFunctionForMysqlJava extends RichAsyncFunction{//链接

private static String jdbcUrl = "jdbc:mysql://192.168.229.128:3306?useSSL=false";private static String username = "root";private static String password = "123456";private static String driverName = "com.mysql.jdbc.Driver";

java.sql.Connection conn;

PreparedStatement ps;

Logger logger= LoggerFactory.getLogger(AsyncFunctionForMysqlJava.class);/*** open 方法中初始化链接

*@paramparameters

*@throwsException*/@Overridepublic void open(Configuration parameters) throwsException {

logger.info("async function for hbase java open ...");super.open(parameters);

Class.forName(driverName);

conn=DriverManager.getConnection(jdbcUrl, username, password);

ps= conn.prepareStatement("select phone from async.async_test where id = ?");

}/*** use user.getId async get user phone

*

*@paramuser

*@paramresultFuture

*@throwsException*/@Overridepublic void asyncInvoke(User user, ResultFuture resultFuture) throwsException {//使用 user id 查询

ps.setString(1, user.getId());

ResultSet rs= ps.executeQuery();

String phone = null;

if (rs.next()) {

phone = rs.getString(1);

}

user.setPhone(phone);

List list = newArrayList();

list.add(user);//放回 result 队列

resultFuture.complete(list);

}

@Overridepublic void timeout(User input, ResultFuture resultFuture) throwsException {

logger.info("Async function for hbase timeout");

List list = newArrayList();

list.add(input);

resultFuture.complete(list);

}/*** close function

*

*@throwsException*/@Overridepublic void close() throwsException {

logger.info("async function for hbase java close ...");super.close();

conn.close();

}

}

测试数据如下:

{"id" : 1, "username" : "venn", "password" : 1561709530935}

{"id" : 2, "username" : "venn", "password" : 1561709536029}

{"id" : 3, "username" : "venn", "password" : 1561709541033}

{"id" : 4, "username" : "venn", "password" : 1561709546037}

{"id" : 5, "username" : "venn", "password" : 1561709551040}

{"id" : 6, "username" : "venn", "password" : 1561709556044}

{"id" : 7, "username" : "venn", "password" : 1561709561048}

执行结果如下:

submit query : 1-1-1562763486845submit query :2-1-1562763486846submit query :3-1-1562763486846submit query :4-1-1562763486849submit query :5-1-1562763486849submit query :6-1-1562763486859submit query :7-1-1562763486913submit query :8-1-1562763486967submit query :9-1-1562763487021execute query :1-2-phone : 12345678910-1562763487316

1> {"password":"1562763486506","phone":"12345678910","id":"1","username":"venn"}

submit query :10-1-1562763487408submit query :11-1-1562763487408execute query :9-2-phone : 1562661110630-1562763487633

1> {"password":"1562763487017","phone":"1562661110630","id":"9","username":"venn"} # 这里可以看到异步,提交查询的到 11 了,执行查询 的只有 1/9,返回了 1/9(unorderedWait 调用)

submit query :12-1-1562763487634execute query :8-2-phone : 1562661110627-1562763487932

1> {"password":"1562763486963","phone":"1562661110627","id":"8","username":"venn"}

submit query :13-1-1562763487933execute query :7-2-phone : 1562661110624-1562763488228

1> {"password":"1562763486909","phone":"1562661110624","id":"7","username":"venn"}

submit query :14-1-1562763488230execute query :6-2-phone : 1562661110622-1562763488526

1> {"password":"1562763486855","phone":"1562661110622","id":"6","username":"venn"}

submit query :15-1-1562763488527execute query :4-2-phone : 12345678913-1562763488832

1> {"password":"1562763486748","phone":"12345678913","id":"4","username":"venn"}

hbase、redis或其他实现类似

欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

flink批处理访问mysql_Flink 异步IO访问外部数据(mysql篇)相关推荐

  1. Flink教程(22)- Flink高级特性(异步IO)

    文章目录 01 引言 02 异步IO 2.1 异步IO介绍 2.2 使用Aysnc I/O的前提条件 2.3 Async I/O API 03 案例演示 04 原理深入 4.1 AsyncDataSt ...

  2. Flink(54):Flink高级特性之异步IO(Async I/O)

    目录 0. 相关文章链接 1. 异步IO概述 1.1. 异步IO操作的需求 1.2. 使用Aysnc I/O的前提条件 1.3. Async I/O API 2. 案例展示 2.1. 需求 2.2. ...

  3. Flink 异步IO访问外部数据(mysql篇)

    接上篇:[翻译]Flink 异步I / O访问外部数据 最近看了大佬的博客,突然想起Async I/O方式是Blink 推给社区的一大重要功能,可以使用异步的方式获取外部数据,想着自己实现以下,项目上 ...

  4. 实战:Flink1.12异步IO访问外部数据-Mysql

    微信公众号:大数据开发运维架构 关注可了解更多大数据相关的资讯.问题或建议,请公众号留言; 如果您觉得"大数据开发运维架构"对你有帮助,欢迎转发朋友圈 从微信公众号拷贝过来,格式有 ...

  5. Flink SQL中使用异步io关联维表

    目录 一.目标 二.方案 2.1. 自己实现demo: 2.2. 社区实现: 一.目标 希望使用flink sql来关联维度表,但是想用异步IO的方式关联. 二.方案 当前社区进展:目前Flink S ...

  6. 2021年大数据Flink(四十六):扩展阅读 异步IO

    目录 扩展阅读  异步IO 介绍 异步IO操作的需求 使用Aysnc I/O的前提条件 Async I/O API 案例演示 扩展阅读 原理深入 AsyncDataStream 消息的顺序性 扩展阅读 ...

  7. vertx web连接超时 阻塞_Flink之基于Vertx的Mysql异步IO

    导读 在流计算中,如果以事件流为主,关联一些维度信息,就需要根据每个事件中的关键信息去数据库执行一次查询.正常的思路可能是通过mapFunction以阻塞的方式查询数据库,等待数据结果返回,然后执行下 ...

  8. Fink异步IO的实战(关联维表)

    简介 异步io实战 知识前提 线程池异步io 应用程序 public class ASyncIODemo {public static void main(String[] args) throws ...

  9. python asyncio与aiohttp_python链家网异步IO爬虫,使用asyncio、aiohttp和aiomysql

    python链家网异步IO爬虫,使用asyncio.aiohttp和aiomysql 平均1秒可以爬取30个详情页信息 可以使用asyncio.Semaphore来控制并发数,达到限速的效果 # -* ...

最新文章

  1. hdu 1496 Equations(技巧hash)
  2. 最简单的kafka demo案例
  3. java 开发:md5_Java社区调查结果:74%的开发人员希望减少详细程度
  4. [IT幽默]互联网的魔鬼词典
  5. 001 spring介绍
  6. 局域网打印机怎么连接_怎么连接同事已共享的打印机?
  7. 你想要的宏基因组-微生物组知识全在这(2022.2)
  8. 常见元件贴片焊接不良的解决方法
  9. 立帖为据,每日学习一课编程技术
  10. 爬虫项目实操四、用Scrapy爬取招聘网站的招聘信息
  11. android qq音乐无法连接网络连接,qq音乐不能播放_qq音乐为什么老是提示说歌曲无效或网络连接失败呢?...
  12. Java实验-宠物商店(链表与接口的使用)
  13. java mongodb avg_Java-mongodb-AggregationOutput(分组、统计)
  14. Linux下安装Nginx(实战配置)
  15. 通过Java生成.pfx(.p12)证书文件
  16. Java字节码,字节码指令
  17. 云课堂缺勤补签软件_GO柱状图绘图指南 | 云课堂(22)
  18. Redis详细教程-学习笔记
  19. 网站以前ico图标替换新图后还显示以前的图标
  20. QGraphicsView简单使用

热门文章

  1. 帝国Cms批量上传多图morepic上传超过最大文件2m的限制的方法
  2. 给定a、b两个文件,各存放50亿个url,每个url各占64字节
  3. CenterNet 读书笔记
  4. python 美颜人脸
  5. 使用PyTorch从零开始实现YOLO-V3目标检测算法 (四)
  6. python读取yuv
  7. Python+OpenCV 平移、旋转、缩放、翻转
  8. 数据结构大总结系列之B树和R树
  9. matlab 数组元素连乘积prod
  10. 【7】青龙面板系列教程之任务消息定时推送