概述

flink的sink是flink三大逻辑结构之一(source,transform,sink),功能就是负责把flink处理后的数据输出到外部系统中,flink 的sink和source的代码结构类似。

在编写代码的过程中,我们可以使用flink已经提供的sink,如kafka,jdbc,es等,当然我们也可以通过自定义的方式,来实现我们自己的sink。下面说明核心类

核心类

SinkFunction 是一个接口,类似于SourceFunction接口。SinkFunction中主要包含一个方法,那就是用于数据输出的invoke 方法,每条记录都会执行一次invoke方法,用于执行输出操作。

// Writes the given value to the sink. This function is called for every record.default void invoke(IN value) throws Exception

default void invoke(IN value, Context context) throws Exception

// Context接口中返回关于时间的信息interface Context {

/** Returns the current processing time. */

long currentProcessingTime();

/** Returns the current event-time watermark. */

long currentWatermark();

/*** Returns the timestamp of the current input record or {@code null} if the element does not* have an assigned timestamp.*/

Long timestamp();

}

我们一般自定义Sink的时候,都是继承AbstractRichFunction,他是一个抽象类,实现了RichFunction接口。

public abstract class AbstractRichFunction implements RichFunction, Serializable

并且提供了关于RuntimContext的操作和open,clone方法。

AbstractRichFunction 有很多实现类,比如关于msyql操作的JDBCSinkFunction,比如直接输出结果的 PrintSinkFunction,在我们开发的过程中,我们进程用print语句来打印结果,但是print函数中就是讲PrintSinkFunction类传递到addSink方法中。

public DataStreamSink print() {

PrintSinkFunction printFunction = new PrintSinkFunction<>();

return addSink(printFunction).name("Print to Std. Out");

}

PrintSinkFunction

我们这里分析一下PrintSinkFunction这个类,这个类就是将没个元素输出到标准输出或者是标准错误输出流中。

public class PrintSinkFunction extends RichSinkFunction {

private static final long serialVersionUID = 1L;

private final PrintSinkOutputWriter writer;

/*** Instantiates a print sink function that prints to standard out.*/

public PrintSinkFunction() {

writer = new PrintSinkOutputWriter<>(false);

}

/*** Instantiates a print sink function that prints to standard out.** @param stdErr True, if the format should print to standard error instead of standard out.*/

public PrintSinkFunction(final boolean stdErr) {

writer = new PrintSinkOutputWriter<>(stdErr);

}

/*** Instantiates a print sink function that prints to standard out and gives a sink identifier.** @param stdErr True, if the format should print to standard error instead of standard out.* @param sinkIdentifier Message that identify sink and is prefixed to the output of the value*/

public PrintSinkFunction(final String sinkIdentifier, final boolean stdErr) {

writer = new PrintSinkOutputWriter<>(sinkIdentifier, stdErr);

}

@Override

public void open(Configuration parameters) throws Exception {

super.open(parameters);

StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();

writer.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());

}

@Override

public void invoke(IN record) {

writer.write(record);

}

@Override

public String toString() {

return writer.toString();

}

}

分析:

1、调用构造函数来创建一个PrintSinkOutputWriter

2、调用open方法中在调用PrintSinkOutputWriter 的open方法,进行初始化

3、调用invoke方法,通过PrintSinkOutputWriter 的writer方法吧record输出

自定义sink

我们这里自定义一个msyql的sink,也就是把flink中的数据,最后输出到mysql中。

public class MyMysqlSink extends RichSinkFunction {

private PreparedStatement ps = null;

private Connection connection = null;

String driver = "com.mysql.jdbc.Driver";

String url = "jdbc:mysql://127.0.0.1:3306/flinkdb";

String username = "root";

String password = "root";

// 初始化方法 @Override

public void open(Configuration parameters) throws Exception {

super.open(parameters);

// 获取连接 connection = getConn();

//执行查询 ps = connection.prepareStatement("select * from person;");

}

private Connection getConn() {

try {

Class.forName(driver);

connection = DriverManager.getConnection(url, username, password);

} catch (Exception e) {

e.printStackTrace();

}

return connection;

}

//Writes the given value to the sink. This function is called for every record. //每一个元素的插入,都会被调用一次invoke方法 @Override

public void invoke(Person p, Context context) throws Exception {

ps.setString(1,p.getName());

ps.setInt(2,p.getAge());

ps.executeUpdate();

}

@Override

public void close() throws Exception {

super.close();

if(connection != null){

connection.close();

}

if (ps != null){

ps.close();

}

}

}

程序调用入口

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 添加自定义数据源 DataStreamSource data = env.addSource(new MyMysqlSource());

data.print().setParallelism(2);

data.addSink(new MyMysqlSink());

// 提交执行任务env.execute("MySourceMysql");

flink mysql sink_聊聊flink的sink相关推荐

  1. flink mysql日志,关于flink的日志文件设置

    1)yarn的启动脚本 exec /bin/bash -c "$JAVA_HOME/bin/java -Xms580m -Xmx580m -XX:MaxDirectMemorySize=32 ...

  2. flink mysql cdc到kafka

    思路: 1.创建flink mysql cdc表 2.将order join products的结果写入到kafka表中. 这样就相当于完成了,DWD中的事实表构建.写入到kafka中,等待消费构建D ...

  3. Flink系列之:Flink CDC深入了解MySQL CDC连接器

    Flink系列之:Flink CDC深入了解MySQL CDC连接器 一.增量快照特性 1.增量快照读取 2.并发读取 3.全量阶段支持 checkpoint 4.无锁算法 5.MySQL高可用性支持 ...

  4. 聊聊flink的FsStateBackend

    序 本文主要研究一下flink的FsStateBackend StateBackend flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/r ...

  5. Flink CDC 系列(3)—— Flink CDC MySQL Connector 与 Flink SQL 的结合使用案例Demo

    Flink CDC 系列文章: <Flink CDC 系列(1)-- 什么是 Flink CDC> <Flink CDC 系列(2)-- Flink CDC 源码编译> < ...

  6. 聊聊flink的logback配置

    序 本文主要研究一下flink的logback配置 client端pom文件配置 <dependencies><!-- Add the two required logback de ...

  7. Flink JDBC Connector:Flink 与数据库集成最佳实践

    整理:陈政羽(Flink 社区志愿者) 摘要:Flink 1.11 引入了 CDC,在此基础上, JDBC Connector 也发生比较大的变化,本文由 Apache Flink Contribut ...

  8. 轻松入门进阶Flink第十课 Flink 面试

    第39讲:Flink 面试-基础篇 到目前为止,关于 Flink 的学习我们就告一段落了,接下来我们将进入最后一个面试模块的学习.在当前大背景下,面试这一关是求职者必须要面对的,也能从侧面考察对 Fl ...

  9. 学习笔记Flink(一)—— Flink简介(介绍、基本概念、应用场景)

    一.Flink介绍 Apache Flink 是一个分布式流批一体化的开源平台.Flink 的核心是一个提供数据分发.通信以及自动容错的流计算引擎.Flink 在流计算之上构建批处理,并且原生的支持迭 ...

最新文章

  1. 十一、FCFS(先来先服务)、SJF(短作业优先)、HRRN(高响应比优先)
  2. 谈谈离散卷积和卷积神经网络
  3. SAP Spartacus的site context配置
  4. 这是一段关乎你的代码:你的未来 我们正在参与
  5. inotify之文件系统事件监控使用入门
  6. greenplum 查询出来的数字加减日期_POLA宝丽美白精华怎么查看生产日期保质期?保质期时间是几年的?查批号在哪里查?...
  7. JDK1.8简单配置环境变量---两步曲
  8. LOADRUNNER连接ORACLE数据库的方法
  9. java soap api操作和发送soap消息
  10. 奇安信代码安全实验室帮助 RedHat 修复两个 oVirt 漏洞,获官方致谢
  11. ubuntu python3.7 gblic问题_glibc
  12. .net MVC小尝试
  13. 一个初学者的辛酸路程-常用模块-6
  14. Win10安装Centos7双系统
  15. 双稳态一键开关机电路
  16. MCU、MPU、DSP、FPGA是什么意思
  17. Android Q版本实现自动连接WiFi
  18. 设置页面左右结构(其中一端自适应)
  19. 浅谈const int *,int const *与int *const
  20. 微信小程序webview识别二维码长按点击识别二维码

热门文章

  1. php 身份认证 claim,asp.net core cookie身份认证view视图中读取/读取User.Claims中的值实例...
  2. 蚂蚁、原形、大水牛和粗麻布
  3. MBD建模规范 stateflow建模 无规矩不成方圆 规范建模行为 提升建模效率 。 主要包含模型接口界面及布局,模块的安全使用、预防易出错的建模模式
  4. 安全宝金融创新论坛:民企有胜出机会
  5. Ceisum开发资料整理
  6. 最落魄的时候,身上带着《毛选》
  7. 制作自己的pods 并上传到cocopods上
  8. Java中字符串CST的时间日期转换
  9. 《全球互联网金融商业模式:格局与发展》——第2章,第2节在线折扣券商
  10. python操作Oracle、PostgreSQL,MySql数据库增删改查