文章目录

  • 代码实现
    • ① 准备环境
    • ② 获取数据源
    • ③ 从Mysql中获取数据源示例

flink 可以从我们常用的各种DB文件(HDFS/LOCAL)SCOKETMQ等等…中加载数据,Flink官方也提供了一些connectors(连接器 理解为springboot-start-xx即可),引入依赖后并进行配置后,可快速获取到数据源。

EX:

  • flink-connector-redis
  • flink-connector-kafka
  • flink-connector-jdbc

同时,Flink还提供了数据源接口(抽象类),我们实现该接口(继承抽象类)就可以实现自定义数据源,不同的接口(抽象类)功能的丰富性与范围不同,分类如下:

EX:

  • SourceFunction: 非并行数据源(并行度只能=1)

  • RichSourceFunction: 多功能非并行数据源(并行度只能=1)

  • ParallelSourceFunction: 并行数据源(并行度能够>=1)

  • RichParallelSourceFunction: 多功能并行数据源(并行度能够>=1)

代码实现

如上所说,实现flink为我们提供的一些数据源接口,即能够实现自定义数据源了!

env.addSource(自定义数据源类对象);

下边进行完整示例演示:

① 准备环境

//准备环境 env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

② 获取数据源

env.addSource(自定义数据源类对象);

③ 从Mysql中获取数据源示例

数据对象

@Data
@NoArgsConstructor
@AllArgsConstructor
public static class VehicleAlarm {private String id;private String licensePlate;private String plateColor;private Long deviceTime;private String zone;
}

自定义数据源类

public static class MysqlSource extends RichParallelSourceFunction<VehicleAlarm> {Connection conn = null;PreparedStatement ps = null;ResultSet result = null;private boolean flag = true;String url = "jdbc:mysql://xxx:3306/alarm-sc?useUnicode=true&characterEncoding=utf-8&useSSL=false";@Overridepublic void open(Configuration parameters) throws Exception {conn = DriverManager.getConnection(url, "root", "root");String sql = "select id,license_plate,plate_color,device_time,`zone` from vehicle_alarm_202103";ps = conn.prepareStatement(sql);super.open(parameters);}@Overridepublic void run(SourceContext<VehicleAlarm> ctx) throws Exception {while (flag) {result = ps.executeQuery();while (result.next()) {String id = result.getString("id");String licensePlate = result.getString("license_plate");String plateColor = result.getString("plate_color");Long deviceTime = result.getLong("device_time");String zone = result.getString("zone");VehicleAlarm vehicleAlarm = new VehicleAlarm(id, licensePlate, plateColor, deviceTime, zone);ctx.collect(vehicleAlarm);}Thread.sleep(2000);}}@Overridepublic void cancel() {flag = false;}@Overridepublic void close() throws Exception {if (conn != null) {conn.close();}if (ps != null) {ps.close();}if (result != null) {result.close();}}
}

结果展示:

方法以及特别属性解释说明:

  • open():数据源最开始打开时执行,整个数据源从加载到销毁,只会执行一次
  • run(SourceContex):实现数据获取逻辑,并可以通过传入的参数ctx进行向下游节点的数据转发。
  • SourceContext:source函数用于发出元素和可能的watermark的接口,确定以及返回source生成的元素的类型。
  • cancel():用来取消数据源,一般在run方法中,会存在一个循环来持续产生数据,cancel方法则可以使该循环终止。
  • close():数据源关闭时执行,整个数据源从加载到销毁,只会执行一次

Flink完整流程代码:

//准备环境 env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(2);
//获取数据源 source
DataStreamSource<VehicleAlarm> streamSource = env.addSource(new MysqlSource());
//数据处理 todo
streamSource.print();
//数据收集 sink
//程序执行 execute
env.execute("mysql-source");

Flink程序加载数据源(3)自定义数据源(1)相关推荐

  1. Flink程序加载数据源(3)自定义数据源(2)从Mysql 加载数据源

    Flink程序加载数据源(3)自定义数据源(2)从Mysql 加载数据源 ​ 上文引出了Flink程序自定义数据源的方法,我们来再次回顾下. ​ Flink还提供了数据源接口(抽象类),我们实现该接口 ...

  2. [转载]spring+mybatis加载属性文件设置数据源失败原因及解决方案 - 泡在网上的日子

    spring+mybatis加载属性文件设置数据源失败原因及解决方案 - 泡在网上的日子 http://www.jcodecraeer.com/a/chengxusheji/java/2013/062 ...

  3. Spring容器加载时执行自定义的方法

    Spring容器加载时执行自定义的方法 需要实现的接口InitializingBean,ApplicationContextAware 案例 package com.djhu.research.web ...

  4. (!详解 Pytorch实战:①)kaggle猫狗数据集二分类:加载(集成/自定义)数据集

    这系列的文章是我对Pytorch入门之后的一个总结,特别是对数据集生成加载这一块加强学习 另外,这里有一些比较常用的数据集,大家可以进行下载: 需要注意的是,本篇文章使用的PyTorch的版本是v0. ...

  5. iOS进阶之底层原理-应用程序加载(dyld加载流程、类与分类的加载)

    iOS应用程序的入口是main函数,那么main函数之前系统做了什么呢? 我们定义一个类方法load,打断点,查看栈进程,我们发现dyld做了很多事,接下来就来探究到底dyld做了什么. 什么是dyl ...

  6. php点击查看更多,微信小程序加载更多和点击查看更多功能介绍

    这篇文章主要为大家详细介绍了微信小程序加载更多,点击查看更多功能,具有一定的参考价值,感兴趣的小伙伴们可以参考一下 本文实例为大家分享了微信小程序加载更多功能实现的具体代码,供大家参考,具体内容如下 ...

  7. windows系统-程序加载时生成栈和堆的过程

    EXE文件的内容分为再配置信息.变量组和函数组,这一点想必大家都清楚了吧.不过,当程序加载到内存后,除此之外还会额外生成两个组,那就是栈和堆.栈是用来存储函数内部临时使用的变量(局部变量[注1]), ...

  8. 【OS学习笔记】十 实模式:实现一个程序加载器-程序加载器如何将用户程序加载到内存并执行

    上一篇文章学习了以下内容: 用一种不同的分段方法,从另一个不同的的角度理解处理器的分段内存访问机制 使用循环和条件转移指令来优化主引导扇区代码 点击链接查看上一篇文章:点击链接查看 对于主引导扇区部分 ...

  9. 计算机系统-程序加载器

    本文将实现一个简单的程序加载器,首先要先了解一下实模式下的内存空间分配,这是固定好的 要实现一个程序加载器,需要实现下面4个步骤 1.将用户程序从硬盘中读取到10000处(当然,放到10000-9FF ...

最新文章

  1. 设计模式之解释器模式(Interpreter)摘录
  2. 小孩学python有意义吗-世界冠军教练告诉你:少儿编程这些坑,能不踩就别踩!...
  3. Visual 数据绑定
  4. Linux 进程间通讯详解一
  5. 某些您可以编辑的区域交叠在一起 可能不能同时显示_DX200操作要领—修改与编辑程序(三十九)...
  6. js事件里面套事件怎么不管用_原生js利用localstorage实现简易TODO list应用
  7. 在docker for win中使用portainer管理容器
  8. 回测引擎代码分析流程图
  9. eclipse adt如何切换到设计界面_如何设计出优秀的UI界面?这4个方面帮你快速优化...
  10. JMV监控工具之JConsole
  11. 计算点到SVM超平面的距离
  12. Eurek自我保护机制
  13. 预后建模绕不开的lasso cox回归
  14. python jupyter notebook怎么调字体大小_配置Jupyter的代码主题 字体以及字体大小 代码自动补全...
  15. 【产品开发】北邮国际学院大二下期末复习
  16. 大多数人都会遇到的几个H5坑(实战)
  17. 一套系统要不要这样贵,5亿美元
  18. 初探使用iOS 7 Sprite Kit与Cocos2d开发游戏的对比(一家之言)
  19. 你把 《时间》 玩明白
  20. python 通讯录系统_Python实现通讯录功能

热门文章

  1. Android音乐播放器开发的MediaPlayer出现IllegalStateException
  2. 【JZOJ4599】西行妖
  3. JavaScript学习手册六:JS条件语句
  4. vux 选择器_vue项目中vux的使用
  5. 深度讲解TS:这样学TS,迟早进大厂【15】:字符串字面量类型
  6. 计算机网络军训口号,关于新生军训的班级霸气口号、标语简短励志句子合集
  7. 记录:jeecg boot 路由带多种参数的配置
  8. 可视化决策树之Python实现
  9. 技术分享| 音视频与微信小程序互通实践
  10. 床前明月光,熊猫在烧香