Fink异步IO的实战(关联维表)
简介
异步io实战
知识前提
线程池异步io
应用程序
public class ASyncIODemo {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.Source//DataStreamSource[1,2,3,4,5]DataStreamSource<CategoryInfo> categoryDS = env.addSource(new RichSourceFunction<CategoryInfo>() {private Boolean flag = true;@Overridepublic void run(SourceContext<CategoryInfo> ctx) throws Exception {Integer[] ids = {1, 2, 3, 4, 5};for (Integer id : ids) {ctx.collect(new CategoryInfo(id, null));}}@Overridepublic void cancel() {this.flag = false;}});//方式一:线程池模拟异步IO//unorderedWait无序等待SingleOutputStreamOperator<CategoryInfo> result2 = AsyncDataStream.unorderedWait(categoryDS, new ASyncIOFunction2(), 1000, TimeUnit.SECONDS, 10);//打印结果result2.print();env.execute();}
}/*** 同步调用+线程池模拟异步IO*/
class ASyncIOFunction2 extends RichAsyncFunction<CategoryInfo, CategoryInfo> {private HashMap<Integer,String> dic_name=new HashMap<>();private ExecutorService executorService;//线程池@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);dic_name.put(1,"手机");dic_name.put(2,"电脑");dic_name.put(3,"服装");executorService = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());}//异步发送请求@Overridepublic void asyncInvoke(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception {executorService.execute(new Runnable() {@Overridepublic void run() {//TODO 这里查询数据库String resName = dic_name.get(input.getId());input.setName(resName);resultFuture.complete(Collections.singletonList(input));}});}@Overridepublic void close() throws Exception {}@Overridepublic void timeout(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception {System.out.println("async call time out!");input.setName("未知");resultFuture.complete(Collections.singleton(input));}
}
输出结果
1> CategoryInfo(id=4, name=null)
2> CategoryInfo(id=5, name=null)
15> CategoryInfo(id=2, name=电脑)
14> CategoryInfo(id=1, name=手机)
16> CategoryInfo(id=3, name=服装)
Java-vertx中提供的异步client实现异步IO
class ASyncIOFunction1 extends RichAsyncFunction<CategoryInfo, CategoryInfo> {private transient SQLClient mySQLClient;@Overridepublic void open(Configuration parameters) throws Exception {JsonObject mySQLClientConfig = new JsonObject();mySQLClientConfig.put("driver_class", "com.mysql.jdbc.Driver").put("url", "jdbc:mysql://localhost:3306/bigdata").put("user", "root").put("password", "root").put("max_pool_size", 20);VertxOptions options = new VertxOptions();options.setEventLoopPoolSize(10);options.setWorkerPoolSize(20);Vertx vertx = Vertx.vertx(options);//根据上面的配置参数获取异步请求客户端mySQLClient = JDBCClient.createNonShared(vertx, mySQLClientConfig);}//使用异步客户端发送异步请求@Overridepublic void asyncInvoke(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception {mySQLClient.getConnection(new Handler<AsyncResult<SQLConnection>>() {@Overridepublic void handle(AsyncResult<SQLConnection> sqlConnectionAsyncResult) {if (sqlConnectionAsyncResult.failed()) {return;}SQLConnection connection = sqlConnectionAsyncResult.result();connection.query("select id,name from t_category where id = " +input.getId(), new Handler<AsyncResult<ResultSet>>() {@Overridepublic void handle(AsyncResult<io.vertx.ext.sql.ResultSet> resultSetAsyncResult) {if (resultSetAsyncResult.succeeded()) {List<JsonObject> rows = resultSetAsyncResult.result().getRows();for (JsonObject jsonObject : rows) {CategoryInfo categoryInfo = new CategoryInfo(jsonObject.getInteger("id"), jsonObject.getString("name"));resultFuture.complete(Collections.singletonList(categoryInfo));}}}});}});}@Overridepublic void close() throws Exception {mySQLClient.close();}@Overridepublic void timeout(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception {System.out.println("async call time out!");input.setName("未知");resultFuture.complete(Collections.singleton(input));}
}
异步IO读取Redis数据
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStreamSource<String> lines = env.readTextFile("data/input/city.txt");SingleOutputStreamOperator<String> result1 = AsyncDataStream.orderedWait(lines, new AsyncRedis(), 10, TimeUnit.SECONDS, 1);SingleOutputStreamOperator<String> result2 = AsyncDataStream.orderedWait(lines, new AsyncRedisByVertx(), 10, TimeUnit.SECONDS, 1);result1.print().setParallelism(1);result2.print().setParallelism(1);env.execute();}
}
/*** 使用异步的方式读取redis的数据*/
class AsyncRedis extends RichAsyncFunction<String, String> {//定义redis的连接池对象private JedisPoolConfig config = null;private static String ADDR = "localhost";private static int PORT = 6379;//等待可用连接的最大时间,单位是毫秒,默认是-1,表示永不超时,如果超过等待时间,则会抛出异常private static int TIMEOUT = 10000;//定义redis的连接池实例private JedisPool jedisPool = null;//定义连接池的核心对象private Jedis jedis = null;//初始化redis的连接@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);//定义连接池对象属性配置config = new JedisPoolConfig();//初始化连接池对象jedisPool = new JedisPool(config, ADDR, PORT, TIMEOUT);//实例化连接对象(获取一个可用的连接)jedis = jedisPool.getResource();}@Overridepublic void close() throws Exception {super.close();if(jedis.isConnected()){jedis.close();}}//异步调用redis@Overridepublic void asyncInvoke(String input, ResultFuture<String> resultFuture) throws Exception {System.out.println("input:"+input);//发起一个异步请求,返回结果CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {String[] arrayData = input.split(",");String name = arrayData[1];String value = jedis.hget("AsyncReadRedis", name);System.out.println("output:"+value);return value;}}).thenAccept((String dbResult)->{//设置请求完成时的回调,将结果返回resultFuture.complete(Collections.singleton(dbResult));});}//连接超时的时候调用的方法,一般在该方法中输出连接超时的错误日志,如果不重新该方法,连接超时后会抛出异常@Overridepublic void timeout(String input, ResultFuture<String> resultFuture) throws Exception {System.out.println("redis connect timeout!");}
}
/*** 使用高性能异步组件vertx实现类似于连接池的功能,效率比连接池要高* 1)在java版本中可以直接使用* 2)如果在scala版本中使用的话,需要scala的版本是2.12+*/
class AsyncRedisByVertx extends RichAsyncFunction<String,String> {//用transient关键字标记的成员变量不参与序列化过程private transient RedisClient redisClient;//获取连接池的配置对象private JedisPoolConfig config = null;//获取连接池JedisPool jedisPool = null;//获取核心对象Jedis jedis = null;//Redis服务器IPprivate static String ADDR = "localhost";//Redis的端口号private static int PORT = 6379;//访问密码private static String AUTH = "XXXXXX";//等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException;private static int TIMEOUT = 10000;private static final Logger logger = LoggerFactory.getLogger(AsyncRedis.class);//初始化连接@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);config = new JedisPoolConfig();jedisPool = new JedisPool(config, ADDR, PORT, TIMEOUT);jedis = jedisPool.getResource();RedisOptions config = new RedisOptions();config.setHost(ADDR);config.setPort(PORT);VertxOptions vo = new VertxOptions();vo.setEventLoopPoolSize(10);vo.setWorkerPoolSize(20);Vertx vertx = Vertx.vertx(vo);redisClient = RedisClient.create(vertx, config);}//数据异步调用@Overridepublic void asyncInvoke(String input, ResultFuture<String> resultFuture) throws Exception {System.out.println("input:"+input);String[] split = input.split(",");String name = split[1];// 发起一个异步请求redisClient.hget("AsyncReadRedis", name, res->{if(res.succeeded()){String result = res.result();if(result== null){resultFuture.complete(null);return;}else {// 设置请求完成时的回调: 将结果传递给 collectorresultFuture.complete(Collections.singleton(result));}}else if(res.failed()) {resultFuture.complete(null);return;}});}@Overridepublic void timeout(String input, ResultFuture resultFuture) throws Exception {}@Overridepublic void close() throws Exception {super.close();if (redisClient != null) {redisClient.close(null);}}
Fink异步IO的实战(关联维表)相关推荐
- Flink SQL中使用异步io关联维表
目录 一.目标 二.方案 2.1. 自己实现demo: 2.2. 社区实现: 一.目标 希望使用flink sql来关联维度表,但是想用异步IO的方式关联. 二.方案 当前社区进展:目前Flink S ...
- Flink异步io应用场景之流表join维表
简介 维度表,作为数据仓库里面的概念,是维度属性的集合,比如时间维.地点维:可以是一个mysql或者cassandra,redis等存储,甚至是自己定义的一些api. 流表是kafka等流式数据. 根 ...
- 流计算技术实战 - 超大维表问题
维度表,作为数据仓库里面的概念,是维度属性的集合,比如时间维.地点维; 但这里要讨论流计算中的维度表问题, 流计算中维表问题和数据仓库中有所不同,往往是因为通过agent采集到的数据比较有限,在做数据 ...
- 【Flink】Flink1.12.0 FlinkSQL消费Kafka 使用 temporal join 关联维表Hive 最新分区数据 join 不上
文章目录 1.概述 2.场景1 2.1 概述 2.2 解决 1.概述 2.场景1 2.1 概述 场景是这样的 Flink SQL 消费kafka 关联Hive维表 最新分区 刚开始 我往Hive维表里 ...
- 【Flink系列】开发篇:1. Flink维表关联方案
数据流往往需要访问外部的数据源来丰富自己的信息,比如通过record中的ip地址查询ip数据库maxmind的GeoIP2 Databases得到ip对应的城市名称,城市经纬度,将这些作为新的字段添加 ...
- Flink实时数据处理实践经验(Flink去重、维表关联、定时器、双流join)
Flink实时数据处理实践经验 文章目录 Flink实时数据处理实践经验 1. 数据输入与预处理 2. 实时数据处理 3. 实时数仓架构 4. 优化方案 Java.大数据开发学习要点(持续更新中-) ...
- Flink SQL 功能解密系列 —— 维表 JOIN 与异步优化
2019独角兽企业重金招聘Python工程师标准>>> 引子 流计算中一个常见的需求就是为数据流补齐字段.因为数据采集端采集到的数据往往比较有限,在做数据分析之前,就要先将所需的维度 ...
- 95-134-112-源码-维表-全量加载MySQL
1 .世界 2.概述 在维表关联中定时全量加载是针对维表数据量较少并且业务对维表数据变化的敏感程度较低的情况下可采取的一种策略,对于这种方案使用有几点需要注意: 全量加载有可能会比较耗时,所以必须是一 ...
- flink中维表Join几种常见方式总结
flink中维表Join 需求如下: 一个主流中数据是用户信息,字段包括用户姓名.城市id: 维表是城市数据,字段包括城市ID.城市名称. 要求用户表与城市表关联,输出为:用户名称.城市ID.城市名称 ...
最新文章
- 使用Pad Designer制作焊盘
- ubuntu 环境下调试mysql源码_Linux中eclipse调试mysql源代码
- 【Boost】boost库中thread多线程详解3——细说lock_guard
- linux服务器加入windows域时报错Ticket expired
- 大型互联网架构演变历程-《淘宝技术这10年》
- python __xxxitem__
- 锁屏界面_强迫症必爱!iPhone怎样隐藏锁屏界面的手电筒、相机图标?
- python中国官网-Python教程
- 【学习笔记】《光纤传感器振动系统信号解调技术研究--华北电力--控制工程--张**》重点笔记
- 科大讯飞语音合成WebApi
- EXCEL散点图怎么做
- 基于Python统计红楼梦中人物信息
- win10计算机怎么连接网络,如何创建宽带连接_win10电脑宽带连接怎么创建 - 驱动管家...
- android 行车记录仪分析,基于Android的智能行车记录仪的设计与实现.doc
- gif透明背景动画_BMP、GIF、TIFF、PNG、JPG和SVG格式图像的特点
- WordPress个人资料中直接修改用户名插件Username Changer
- Java基础 - 坦克大战(第四章,线程基础)
- Spark之SparkSQL
- YOLOX训练:显存足够,但依旧CUDA out of memory(Tried to allocate 5.58 GiB,8.00 GiB total capacity,6.40 GiB free)
- OpenSSL密码库算法笔记——第3.1.1章 模加
热门文章
- C语言折半查找法(超详细)
- Semaphore的概念及基本用法
- php+html配合方式小结
- 第一次暑期集训之前期排位赛
- IBM热门职位随手掰掰 --- 客串猎头?
- 谷歌浏览器怎么长截图怎么截_谷歌浏览器长截图怎么截图_chrome谷歌浏览器截长图的步骤-win7之家...
- CRect 基本用法
- 13寸MacBook Air 与 Pro低配、Pro高配到底有啥区别?
- arachni web mysql数据库_Arachni
- 赴一场开源盛会丨10月29日 COSCon‘22 开源年会杭州分会场,这里只差一个「你」!