Flink程序加载数据源(3)自定义数据源(1)
文章目录
- 代码实现
- ① 准备环境
- ② 获取数据源
- ③ 从Mysql中获取数据源示例
flink 可以从我们常用的各种DB
、文件(HDFS/LOCAL)
、SCOKET
、MQ
等等…中加载数据,Flink官方也提供了一些connectors
(连接器 理解为springboot-start-xx即可),引入依赖后并进行配置后,可快速获取到数据源。
EX:
- flink-connector-redis
- flink-connector-kafka
- flink-connector-jdbc
同时,Flink还提供了数据源接口(抽象类),我们实现该接口(继承抽象类)就可以实现自定义数据源,不同的接口(抽象类)功能的丰富性与范围不同,分类如下:
EX:
SourceFunction: 非并行数据源(并行度只能=1)
RichSourceFunction: 多功能非并行数据源(并行度只能=1)
ParallelSourceFunction: 并行数据源(并行度能够>=1)
RichParallelSourceFunction: 多功能并行数据源(并行度能够>=1)
代码实现
如上所说,实现flink
为我们提供的一些数据源接口,即能够实现自定义数据源了!
env.addSource(自定义数据源类对象);
下边进行完整示例演示:
① 准备环境
//准备环境 env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
② 获取数据源
env.addSource(自定义数据源类对象);
③ 从Mysql中获取数据源示例
数据对象
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class VehicleAlarm {private String id;private String licensePlate;private String plateColor;private Long deviceTime;private String zone;
}
自定义数据源类
public static class MysqlSource extends RichParallelSourceFunction<VehicleAlarm> {Connection conn = null;PreparedStatement ps = null;ResultSet result = null;private boolean flag = true;String url = "jdbc:mysql://xxx:3306/alarm-sc?useUnicode=true&characterEncoding=utf-8&useSSL=false";@Overridepublic void open(Configuration parameters) throws Exception {conn = DriverManager.getConnection(url, "root", "root");String sql = "select id,license_plate,plate_color,device_time,`zone` from vehicle_alarm_202103";ps = conn.prepareStatement(sql);super.open(parameters);}@Overridepublic void run(SourceContext<VehicleAlarm> ctx) throws Exception {while (flag) {result = ps.executeQuery();while (result.next()) {String id = result.getString("id");String licensePlate = result.getString("license_plate");String plateColor = result.getString("plate_color");Long deviceTime = result.getLong("device_time");String zone = result.getString("zone");VehicleAlarm vehicleAlarm = new VehicleAlarm(id, licensePlate, plateColor, deviceTime, zone);ctx.collect(vehicleAlarm);}Thread.sleep(2000);}}@Overridepublic void cancel() {flag = false;}@Overridepublic void close() throws Exception {if (conn != null) {conn.close();}if (ps != null) {ps.close();}if (result != null) {result.close();}}
}
结果展示:
方法以及特别属性解释说明:
- open():数据源最开始打开时执行,整个数据源从加载到销毁,只会执行一次
- run(SourceContex):实现数据获取逻辑,并可以通过传入的参数ctx进行向下游节点的数据转发。
- SourceContext:source函数用于发出元素和可能的watermark的接口,确定以及返回source生成的元素的类型。
- cancel():用来取消数据源,一般在run方法中,会存在一个循环来持续产生数据,cancel方法则可以使该循环终止。
- close():数据源关闭时执行,整个数据源从加载到销毁,只会执行一次
Flink完整流程代码:
//准备环境 env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(2);
//获取数据源 source
DataStreamSource<VehicleAlarm> streamSource = env.addSource(new MysqlSource());
//数据处理 todo
streamSource.print();
//数据收集 sink
//程序执行 execute
env.execute("mysql-source");
Flink程序加载数据源(3)自定义数据源(1)相关推荐
- Flink程序加载数据源(3)自定义数据源(2)从Mysql 加载数据源
Flink程序加载数据源(3)自定义数据源(2)从Mysql 加载数据源 上文引出了Flink程序自定义数据源的方法,我们来再次回顾下. Flink还提供了数据源接口(抽象类),我们实现该接口 ...
- [转载]spring+mybatis加载属性文件设置数据源失败原因及解决方案 - 泡在网上的日子
spring+mybatis加载属性文件设置数据源失败原因及解决方案 - 泡在网上的日子 http://www.jcodecraeer.com/a/chengxusheji/java/2013/062 ...
- Spring容器加载时执行自定义的方法
Spring容器加载时执行自定义的方法 需要实现的接口InitializingBean,ApplicationContextAware 案例 package com.djhu.research.web ...
- (!详解 Pytorch实战:①)kaggle猫狗数据集二分类:加载(集成/自定义)数据集
这系列的文章是我对Pytorch入门之后的一个总结,特别是对数据集生成加载这一块加强学习 另外,这里有一些比较常用的数据集,大家可以进行下载: 需要注意的是,本篇文章使用的PyTorch的版本是v0. ...
- iOS进阶之底层原理-应用程序加载(dyld加载流程、类与分类的加载)
iOS应用程序的入口是main函数,那么main函数之前系统做了什么呢? 我们定义一个类方法load,打断点,查看栈进程,我们发现dyld做了很多事,接下来就来探究到底dyld做了什么. 什么是dyl ...
- php点击查看更多,微信小程序加载更多和点击查看更多功能介绍
这篇文章主要为大家详细介绍了微信小程序加载更多,点击查看更多功能,具有一定的参考价值,感兴趣的小伙伴们可以参考一下 本文实例为大家分享了微信小程序加载更多功能实现的具体代码,供大家参考,具体内容如下 ...
- windows系统-程序加载时生成栈和堆的过程
EXE文件的内容分为再配置信息.变量组和函数组,这一点想必大家都清楚了吧.不过,当程序加载到内存后,除此之外还会额外生成两个组,那就是栈和堆.栈是用来存储函数内部临时使用的变量(局部变量[注1]), ...
- 【OS学习笔记】十 实模式:实现一个程序加载器-程序加载器如何将用户程序加载到内存并执行
上一篇文章学习了以下内容: 用一种不同的分段方法,从另一个不同的的角度理解处理器的分段内存访问机制 使用循环和条件转移指令来优化主引导扇区代码 点击链接查看上一篇文章:点击链接查看 对于主引导扇区部分 ...
- 计算机系统-程序加载器
本文将实现一个简单的程序加载器,首先要先了解一下实模式下的内存空间分配,这是固定好的 要实现一个程序加载器,需要实现下面4个步骤 1.将用户程序从硬盘中读取到10000处(当然,放到10000-9FF ...
最新文章
- 设计模式之解释器模式(Interpreter)摘录
- 小孩学python有意义吗-世界冠军教练告诉你:少儿编程这些坑,能不踩就别踩!...
- Visual 数据绑定
- Linux 进程间通讯详解一
- 某些您可以编辑的区域交叠在一起 可能不能同时显示_DX200操作要领—修改与编辑程序(三十九)...
- js事件里面套事件怎么不管用_原生js利用localstorage实现简易TODO list应用
- 在docker for win中使用portainer管理容器
- 回测引擎代码分析流程图
- eclipse adt如何切换到设计界面_如何设计出优秀的UI界面?这4个方面帮你快速优化...
- JMV监控工具之JConsole
- 计算点到SVM超平面的距离
- Eurek自我保护机制
- 预后建模绕不开的lasso cox回归
- python jupyter notebook怎么调字体大小_配置Jupyter的代码主题 字体以及字体大小 代码自动补全...
- 【产品开发】北邮国际学院大二下期末复习
- 大多数人都会遇到的几个H5坑(实战)
- 一套系统要不要这样贵,5亿美元
- 初探使用iOS 7 Sprite Kit与Cocos2d开发游戏的对比(一家之言)
- 你把 《时间》 玩明白
- python 通讯录系统_Python实现通讯录功能