微信公众号:大数据开发运维架构

关注可了解更多大数据相关的资讯。问题或建议,请公众号留言;

如果您觉得“大数据开发运维架构”对你有帮助,欢迎转发朋友圈

从微信公众号拷贝过来,格式有些错乱,建议直接去公众号阅读


本文结合官网和网络资料,讲解 Flink 用于访问外部数据存储的异步 I/O API。对于不熟悉异步或者事件驱动编程的用户,建议先储备一些关于 Future 和事件驱动编程的知识。

Flink异步IO官方文档地址:

https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/asyncio/

我程序大体流程是,从socket流接收用户数据user(id,userNum,age三个字段),利用记录数据中的userNum字段,查询Mysql数据库用户姓名字典表userCode中的userName用户名,将查询出的userName字段,回填到user中,最后将回填的数据存到Mysql数据库的userInfo表中。

1.建表语句:

userCode(字典表)和userInfo表(目标表)对应DDL语句:

CREATE TABLE `userInfo` (`id` int(11) NOT NULL,`userNum` int(11) DEFAULT NULL,`userName` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL,`age` int(11) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci

CREATE TABLE `userCode` (`id` int(11) NOT NULL,`name` varchar(50) COLLATE utf8mb4_unicode_ci DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci

2.pom.xml文件,这里引入了阿里巴巴的druid


<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.44</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.20</version></dependency></dependencies>

3.代码整体结构如下:

主函数:FlinkMysqlAsyncIOMain

异步IO处理类,主要用于查询mysql:MysqlAsyncRichMapFunction

Sink端,将结果输出到mysql中:MysqlSinkFunction

数据库工具类:DBUtils

用户实体类:User


package com.hadoop.ljs.flink112.study.asyncIO;import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.concurrent.TimeUnit;
/*** @author: Created By lujisen* @company China JiNan* @date: 2021-08-18 14:50* @version: v1.0* @description: com.hadoop.ljs.flink112.study.asyncIO*/
public class FlinkMysqlAsyncIOMain {public static void main(String[] args) throws Exception {int maxIORequestCount = 20; /*最大的异步请求数量*/StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> socketDataStream = senv.socketTextStream("192.168.0.111", 9999);DataStream<User> userDataStream = AsyncDataStream.orderedWait(socketDataStream,new MysqlAsyncRichMapFunction(),   //自定义的Mysql异步处理类500000,                      //异步超时时间TimeUnit.MILLISECONDS,             //时间单位maxIORequestCount                  //最大异步并发请求数量);userDataStream.print();userDataStream.addSink(new MysqlSinkFunction<User>());/* userDataStream.print();*/senv.execute("FlinkMysqlAsyncIOMain");}
}
package com.hadoop.ljs.flink112.study.asyncIO;
import com.alibaba.druid.pool.DruidDataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.concurrent.*;
import java.util.function.Supplier;
/*** @author: Created By lujisen* @company China JiNan* @date: 2021-08-18 10:25* @version: v1.0* @description: com.hadoop.ljs.flink112.study.asyncIO*/
public class MysqlAsyncRichMapFunction extends RichAsyncFunction<String,User> {/** 能够利用回调函数并发发送请求的数据库客户端,加上transient,不让其序列化 *//** 创建线程池、Mysql连接池 */@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);}@Overridepublic void asyncInvoke(String line, ResultFuture<User> resultFuture) throws Exception {String[] split = line.split(",");User user = new User();user.setId(Integer.valueOf(split[0]));user.setUserNum(Integer.valueOf(split[1]));user.setAge(Integer.valueOf(split[2]));Future<String> dbResult = DBUtils.executorService.submit(new Callable<String>() {@Overridepublic String call() throws Exception {ResultSet resultSet=null;PreparedStatement statement=null;String sql = "SELECT id, name FROM userCode WHERE id = ?";String userName=null;if(user.getUserNum()==1001){System.out.println("当前getUserNum:"+user.getUserNum()+"开始睡眠30秒!!!");Thread.sleep(30000);}try {statement = DBUtils.getConnection().prepareStatement(sql);statement.setInt(1,user.getUserNum());resultSet = statement.executeQuery();while (resultSet.next()) {userName= resultSet.getString("name");}} finally {if (resultSet != null) {resultSet.close();}if (statement != null) {statement.close();}}return userName;}});CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {try {return dbResult.get();} catch (InterruptedException | ExecutionException e) {// 显示地处理异常。return null;}}}).thenAccept( (String userName) -> {user.setUserName(userName);resultFuture.complete(Collections.singleton(user));});}@Overridepublic void close() throws Exception {super.close();}
}

package com.hadoop.ljs.flink112.study.asyncIO;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.PreparedStatement;
/*** @author: Created By lujisen* @company China JiNan* @date: 2021-08-18 15:02* @version: v1.0* @description: com.hadoop.ljs.flink112.study.asyncIO*/
public class MysqlSinkFunction<U> extends RichSinkFunction<User> {private static final String UPSERT_CASE = "INSERT INTO userInfo(id,userNum,userName,age) VALUES (?, ?,?,?)";private PreparedStatement statement=null;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);statement = DBUtils.getConnection().prepareStatement(UPSERT_CASE);}@Overridepublic void invoke(User user, Context context) throws Exception {statement.setInt(1,user.getId());statement.setInt(2, user.userNum);statement.setString(3, user.getUserName());statement.setInt(4, user.getAge());statement.addBatch();statement.executeBatch();}@Overridepublic void close() throws Exception {super.close();if(statement!=null){statement.close();}}
}
package com.hadoop.ljs.flink112.study.asyncIO;import com.alibaba.druid.pool.DruidDataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.ExecutorService;
import static java.util.concurrent.Executors.newFixedThreadPool;
/*** @author: Created By lujisen* @company China JiNan* @date: 2021-08-18 15:19* @version: v1.0* @description: com.hadoop.ljs.flink112.study.asyncIO*/
public class DBUtils {private static String jdbcUrl = "jdbc:mysql://192.168.0.111:3306/lujisen?characterEncoding=utf8";private static String username = "root";private static String password = "123456a?";private static String driverName = "com.mysql.jdbc.Driver";public static Connection connection=null;public static transient ExecutorService executorService = null;public static transient DruidDataSource dataSource = null;/*连接池最大线程数*/private static int maxPoolConn=20;/*静态初始化*/static {//创建线程池executorService = newFixedThreadPool(maxPoolConn);dataSource=new DruidDataSource();dataSource.setDriverClassName(driverName);dataSource.setUsername(username);dataSource.setPassword(password);dataSource.setUrl(jdbcUrl);dataSource.setMaxActive(maxPoolConn);}public static Connection getConnection() throws SQLException {if(connection==null){connection= dataSource.getConnection();}return connection;}
}
package com.hadoop.ljs.flink112.study.asyncIO;
/*** @author: Created By lujisen* @company China JiNan* @date: 2021-08-18 10:13* @version: v1.0* @description: com.hadoop.ljs.flink112.study.asyncIO*/
public class User {int id;int userNum;String userName;int age;public User() {}public User(int id, int userNum, String userName, int age) {this.id = id;this.userNum = userNum;this.userName = userName;this.age = age;}public int getId() {return id;}public void setId(int id) {this.id = id;}public int getUserNum() {return userNum;}public void setUserNum(int userNum) {this.userNum = userNum;}public String getUserName() {return userName;}public void setUserName(String userName) {this.userName = userName;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}@Overridepublic String toString() {return "User{" +"id=" + id +", userNum=" + userNum +", userName='" + userName + '\'' +", age=" + age +'}';}
}

说明:这里可忽略上面的多表数据,只发送后面的2条数据即可,Main函数中设置Flink异步IO超时时间是50s,而MysqlAsyncRichMapFunction文件asyncInvoke函数处理逻辑,当数据id为1时,进程休眠30秒,第一条数据阻塞执行,收到第二条数据不阻塞,直接处理第二条写入mysql数据库,30秒后,程序继续处理第一条数据,可观察Mysql数据表userInfo,看到第二条数据先入库,第一条数据30秒之后入库。


1,1001,23  第一条数据
2,1002,24  第二条数据

实战:Flink1.12异步IO访问外部数据-Mysql相关推荐

  1. Flink 异步IO访问外部数据(mysql篇)

    接上篇:[翻译]Flink 异步I / O访问外部数据 最近看了大佬的博客,突然想起Async I/O方式是Blink 推给社区的一大重要功能,可以使用异步的方式获取外部数据,想着自己实现以下,项目上 ...

  2. flink批处理访问mysql_Flink 异步IO访问外部数据(mysql篇)

    最近看了大佬的博客,突然想起Async I/O方式是Blink 推给社区的一大重要功能,可以使用异步的方式获取外部数据,想着自己实现以下,项目上用的时候,可以不用现去找了. 最开始想用scala 实现 ...

  3. 2021年大数据Flink(四十六):扩展阅读 异步IO

    目录 扩展阅读  异步IO 介绍 异步IO操作的需求 使用Aysnc I/O的前提条件 Async I/O API 案例演示 扩展阅读 原理深入 AsyncDataStream 消息的顺序性 扩展阅读 ...

  4. Fink异步IO的实战(关联维表)

    简介 异步io实战 知识前提 线程池异步io 应用程序 public class ASyncIODemo {public static void main(String[] args) throws ...

  5. 操作系统学习:Linux0.12文件异步IO

    本文参考书籍 1.操作系统真相还原 2.Linux内核完全剖析:基于0.12内核 3.x86汇编语言 从实模式到保护模式 4.Linux内核设计的艺术 ps:基于x86硬件的pc系统 Linux0.1 ...

  6. Flink教程(22)- Flink高级特性(异步IO)

    文章目录 01 引言 02 异步IO 2.1 异步IO介绍 2.2 使用Aysnc I/O的前提条件 2.3 Async I/O API 03 案例演示 04 原理深入 4.1 AsyncDataSt ...

  7. Flink(54):Flink高级特性之异步IO(Async I/O)

    目录 0. 相关文章链接 1. 异步IO概述 1.1. 异步IO操作的需求 1.2. 使用Aysnc I/O的前提条件 1.3. Async I/O API 2. 案例展示 2.1. 需求 2.2. ...

  8. vertx web连接超时 阻塞_Flink之基于Vertx的Mysql异步IO

    导读 在流计算中,如果以事件流为主,关联一些维度信息,就需要根据每个事件中的关键信息去数据库执行一次查询.正常的思路可能是通过mapFunction以阻塞的方式查询数据库,等待数据结果返回,然后执行下 ...

  9. PostgreSQL 10.1 手册_部分 II. SQL 语言_第 5 章 数据定义_5.11. 外部数据

    5.11. 外部数据 PostgreSQL实现了部分的SQL/MED规定,允许我们使用普通SQL查询来访问位于PostgreSQL之外的数据.这种数据被称为外部数据(注意这种用法不要和外键混淆,后者是 ...

最新文章

  1. Notification和KVO有什么不同
  2. HTSRealistic missions 10:Holy Word High School
  3. MYSQL数据文件--.frm文件(只有.frm文件时的表结构恢复)
  4. Haproxy+keepalived高可用代理服务
  5. React 万能的函数表达式
  6. 大数问题(高精度运算)
  7. linux 服务器账号及安全杂谈
  8. jquery 验证小数点后几位_(亲测可用)input只能输入数字或小数点后几位
  9. 用fiddler解决跨域访问
  10. Dockerfile 中的命令
  11. 发那科机器人注油_安川机器人加油保养流程
  12. C语言使用栈和队列实现停车场管理
  13. android 5.0设备 外接键盘 输入中文
  14. 三种视觉自动化检测的解决方案
  15. 假如明天失业了,我该去哪里
  16. Angr-CTF学习笔记11-13
  17. 面试题:数据库优化的方法
  18. android 弹幕框架DanmakuFlameMaster,解决 控制Ui和弹幕点击的问题.
  19. nodejs负载均衡(一):服务负载均衡
  20. 对我国师生数学学习和教学观念的反思 郇中丹教授

热门文章

  1. 英语计算机编程文档,计算机编程常用英语
  2. 影响 SEO 的排名优化的因素
  3. Phoenix 烟雾
  4. 部落冲突上链,币安领投Heroes of Mavia带来全新模式
  5. Pointcut注解表达式@target、@annotation、@within、this、target、within等
  6. Arcpy发布地图服务
  7. uniapp 小程序打开预览pdf文件
  8. P1262 间谍网络 (Tarjan 求强连通分量)
  9. java判断文件夹下是否存在文件_java 判断文件夹是否存在文件
  10. 在winlogon桌面显示窗口