flink mysql sink_聊聊flink的sink
概述
flink的sink是flink三大逻辑结构之一(source,transform,sink),功能就是负责把flink处理后的数据输出到外部系统中,flink 的sink和source的代码结构类似。
在编写代码的过程中,我们可以使用flink已经提供的sink,如kafka,jdbc,es等,当然我们也可以通过自定义的方式,来实现我们自己的sink。下面说明核心类
核心类
SinkFunction 是一个接口,类似于SourceFunction接口。SinkFunction中主要包含一个方法,那就是用于数据输出的invoke 方法,每条记录都会执行一次invoke方法,用于执行输出操作。
// Writes the given value to the sink. This function is called for every record.default void invoke(IN value) throws Exception
default void invoke(IN value, Context context) throws Exception
// Context接口中返回关于时间的信息interface Context {
/** Returns the current processing time. */
long currentProcessingTime();
/** Returns the current event-time watermark. */
long currentWatermark();
/*** Returns the timestamp of the current input record or {@code null} if the element does not* have an assigned timestamp.*/
Long timestamp();
}
我们一般自定义Sink的时候,都是继承AbstractRichFunction,他是一个抽象类,实现了RichFunction接口。
public abstract class AbstractRichFunction implements RichFunction, Serializable
并且提供了关于RuntimContext的操作和open,clone方法。
AbstractRichFunction 有很多实现类,比如关于msyql操作的JDBCSinkFunction,比如直接输出结果的 PrintSinkFunction,在我们开发的过程中,我们进程用print语句来打印结果,但是print函数中就是讲PrintSinkFunction类传递到addSink方法中。
public DataStreamSink print() {
PrintSinkFunction printFunction = new PrintSinkFunction<>();
return addSink(printFunction).name("Print to Std. Out");
}
PrintSinkFunction
我们这里分析一下PrintSinkFunction这个类,这个类就是将没个元素输出到标准输出或者是标准错误输出流中。
public class PrintSinkFunction extends RichSinkFunction {
private static final long serialVersionUID = 1L;
private final PrintSinkOutputWriter writer;
/*** Instantiates a print sink function that prints to standard out.*/
public PrintSinkFunction() {
writer = new PrintSinkOutputWriter<>(false);
}
/*** Instantiates a print sink function that prints to standard out.** @param stdErr True, if the format should print to standard error instead of standard out.*/
public PrintSinkFunction(final boolean stdErr) {
writer = new PrintSinkOutputWriter<>(stdErr);
}
/*** Instantiates a print sink function that prints to standard out and gives a sink identifier.** @param stdErr True, if the format should print to standard error instead of standard out.* @param sinkIdentifier Message that identify sink and is prefixed to the output of the value*/
public PrintSinkFunction(final String sinkIdentifier, final boolean stdErr) {
writer = new PrintSinkOutputWriter<>(sinkIdentifier, stdErr);
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
writer.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
}
@Override
public void invoke(IN record) {
writer.write(record);
}
@Override
public String toString() {
return writer.toString();
}
}
分析:
1、调用构造函数来创建一个PrintSinkOutputWriter
2、调用open方法中在调用PrintSinkOutputWriter 的open方法,进行初始化
3、调用invoke方法,通过PrintSinkOutputWriter 的writer方法吧record输出
自定义sink
我们这里自定义一个msyql的sink,也就是把flink中的数据,最后输出到mysql中。
public class MyMysqlSink extends RichSinkFunction {
private PreparedStatement ps = null;
private Connection connection = null;
String driver = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://127.0.0.1:3306/flinkdb";
String username = "root";
String password = "root";
// 初始化方法 @Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 获取连接 connection = getConn();
//执行查询 ps = connection.prepareStatement("select * from person;");
}
private Connection getConn() {
try {
Class.forName(driver);
connection = DriverManager.getConnection(url, username, password);
} catch (Exception e) {
e.printStackTrace();
}
return connection;
}
//Writes the given value to the sink. This function is called for every record. //每一个元素的插入,都会被调用一次invoke方法 @Override
public void invoke(Person p, Context context) throws Exception {
ps.setString(1,p.getName());
ps.setInt(2,p.getAge());
ps.executeUpdate();
}
@Override
public void close() throws Exception {
super.close();
if(connection != null){
connection.close();
}
if (ps != null){
ps.close();
}
}
}
程序调用入口
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 添加自定义数据源 DataStreamSource data = env.addSource(new MyMysqlSource());
data.print().setParallelism(2);
data.addSink(new MyMysqlSink());
// 提交执行任务env.execute("MySourceMysql");
flink mysql sink_聊聊flink的sink相关推荐
- flink mysql日志,关于flink的日志文件设置
1)yarn的启动脚本 exec /bin/bash -c "$JAVA_HOME/bin/java -Xms580m -Xmx580m -XX:MaxDirectMemorySize=32 ...
- flink mysql cdc到kafka
思路: 1.创建flink mysql cdc表 2.将order join products的结果写入到kafka表中. 这样就相当于完成了,DWD中的事实表构建.写入到kafka中,等待消费构建D ...
- Flink系列之:Flink CDC深入了解MySQL CDC连接器
Flink系列之:Flink CDC深入了解MySQL CDC连接器 一.增量快照特性 1.增量快照读取 2.并发读取 3.全量阶段支持 checkpoint 4.无锁算法 5.MySQL高可用性支持 ...
- 聊聊flink的FsStateBackend
序 本文主要研究一下flink的FsStateBackend StateBackend flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/r ...
- Flink CDC 系列(3)—— Flink CDC MySQL Connector 与 Flink SQL 的结合使用案例Demo
Flink CDC 系列文章: <Flink CDC 系列(1)-- 什么是 Flink CDC> <Flink CDC 系列(2)-- Flink CDC 源码编译> < ...
- 聊聊flink的logback配置
序 本文主要研究一下flink的logback配置 client端pom文件配置 <dependencies><!-- Add the two required logback de ...
- Flink JDBC Connector:Flink 与数据库集成最佳实践
整理:陈政羽(Flink 社区志愿者) 摘要:Flink 1.11 引入了 CDC,在此基础上, JDBC Connector 也发生比较大的变化,本文由 Apache Flink Contribut ...
- 轻松入门进阶Flink第十课 Flink 面试
第39讲:Flink 面试-基础篇 到目前为止,关于 Flink 的学习我们就告一段落了,接下来我们将进入最后一个面试模块的学习.在当前大背景下,面试这一关是求职者必须要面对的,也能从侧面考察对 Fl ...
- 学习笔记Flink(一)—— Flink简介(介绍、基本概念、应用场景)
一.Flink介绍 Apache Flink 是一个分布式流批一体化的开源平台.Flink 的核心是一个提供数据分发.通信以及自动容错的流计算引擎.Flink 在流计算之上构建批处理,并且原生的支持迭 ...
最新文章
- 十一、FCFS(先来先服务)、SJF(短作业优先)、HRRN(高响应比优先)
- 谈谈离散卷积和卷积神经网络
- SAP Spartacus的site context配置
- 这是一段关乎你的代码:你的未来 我们正在参与
- inotify之文件系统事件监控使用入门
- greenplum 查询出来的数字加减日期_POLA宝丽美白精华怎么查看生产日期保质期?保质期时间是几年的?查批号在哪里查?...
- JDK1.8简单配置环境变量---两步曲
- LOADRUNNER连接ORACLE数据库的方法
- java soap api操作和发送soap消息
- 奇安信代码安全实验室帮助 RedHat 修复两个 oVirt 漏洞,获官方致谢
- ubuntu python3.7 gblic问题_glibc
- .net MVC小尝试
- 一个初学者的辛酸路程-常用模块-6
- Win10安装Centos7双系统
- 双稳态一键开关机电路
- MCU、MPU、DSP、FPGA是什么意思
- Android Q版本实现自动连接WiFi
- 设置页面左右结构(其中一端自适应)
- 浅谈const int *,int const *与int *const
- 微信小程序webview识别二维码长按点击识别二维码
热门文章
- php 身份认证 claim,asp.net core cookie身份认证view视图中读取/读取User.Claims中的值实例...
- 蚂蚁、原形、大水牛和粗麻布
- MBD建模规范 stateflow建模 无规矩不成方圆 规范建模行为 提升建模效率 。 主要包含模型接口界面及布局,模块的安全使用、预防易出错的建模模式
- 安全宝金融创新论坛:民企有胜出机会
- Ceisum开发资料整理
- 最落魄的时候,身上带着《毛选》
- 制作自己的pods 并上传到cocopods上
- Java中字符串CST的时间日期转换
- 《全球互联网金融商业模式:格局与发展》——第2章,第2节在线折扣券商
- python操作Oracle、PostgreSQL,MySql数据库增删改查