flink sink jdbc没有数据_No.2 为什么Flink无法实时写入MySQL?
抛出疑无路?
【Flink 1.10】- 使用flink-jdbc连接器的方式与MySQL交互,读数据和写数据都能完成,但是在写数据时,发现Flink程序执行完毕之后,才能在MySQL中查询到插入的数据。即,虽然是流计算,但却不能实时的输出计算结果?From @罗鹏程
JDBCAppendTableSink.builder() .setDrivername("com.mysql.jdbc.Driver") .setDBUrl("jdbc:mysql://localhost/flink") .setUsername("root") .setPassword("123456") .setParameterTypes( BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO) .setQuery("insert into batch_size values(?,?)") .build()
再现又一村!
【Flink-1.10】这个问题是知道一秒钟,不知磨洋工的Case,在初学时候非常容易遇上,那么真的是Flink不能实时写入MySQL吗?当然不是,上面代码基础之上简单的加上一行,就解决问题了:
....setBatchSize(1) //将写入MySQL的buffer大小为1。..
向前一小步...
那么问题虽然解决了,根本原因是个啥呢?也许你看到这里会说,这问题很明显,就是Flink设计JDBC Sink的时候出于性能因素考虑,对写入buffer做了默认值设置,没错,这一点你说的很对,在【Flink-1.10】中JDBC OutputFormat的基类 AbstractJDBCOutputFormat里面和这相关的变量 DEFAULT_FLUSH_MAX_SIZE 默认值是5000,所以在你学习测试时候由于测试数据少(少于5000),数据一直在buffer中,直到数据源数据结束,作业也结束了,才将计算结果刷入MySQL,所以没有实时的(每条)写入MySQL。如下:
但这里还有个因素需要注意,那就是时间因素,上面DEFAULT_FLUSH_INTERVAL_MILLS默认值是0,这个相当于没有时间限制,一直等到buffer满了或者作业结束才能触发写出动作。也就是有些初学者,发现问题,即使故意 debug时候打上断点,不让作业结束,但是等到花儿都谢了,数据也没有写入到MySQL。
在【Flink 1.10】中AbstractJDBCOutputFormat有两个实现类,
分别对应了如下两类Sink:
所以在【Flink 1.10】中不论是 AppendTableSink和UpsertTableSink都会有同样的问题。不过UpsertTableSink时候用户可以设置时间,而AppendTableSink是连时间设置的入口都木有。
Flink 的锅?...
就这个问题而言,我个人认为不是用户的问题,是Flink 1.10代码设计有进一步改进的空间。在Flink 1.11 中社区的确重构了,对JDBCOutputFormat 打了 @Deprecated。感兴趣可以查阅 FLINK-17537 了解变化过程。但是在这个改进中,并没有对 DEFAULT_FLUSH_MAX_SIZE 默认值和 DEFAULT_FLUSH_INTERVAL_MILLS 默认值做变化,社区也在积极的讨论改进方案,想参与社区贡献或者了解最终讨论结果的可以查阅 FLINK-16497。
举一反三
当然在你学习过程中使用任何Sink的时候,只要没有实时写入,都可以找找是否有写出buffer和写出时间的限制设置。在这一点上,罗鹏程 也提到了Elasticsearch也有类似问题,需要调用setBulkFlushMaxActions进行设置。
众人拾柴
期待你典型问题的抛出... 我将知无不言...言无不尽... 我在又一村等你...
关注我的订阅号,我们并肩续航...
flink sink jdbc没有数据_No.2 为什么Flink无法实时写入MySQL?相关推荐
- mysql 设置默认值_为什么 Flink 无法实时写入 MySQL?
作者:孙金城 摘要:本文为 Flink 生产环境应用中的疑问剖析,Flink 无法实时写入 MySQL 是初学者常见问题之一,由社区同学罗鹏程提出,Apache Flink PMC 孙金城(金竹)老师 ...
- Flink使用JdbcSink下沉数据到数据库MySQL
Flink官网提供了JdbcSink的功能,如下: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEn ...
- 9.FLINK Sink\API\自定义sink
9.Sink 9.1.API 9.2.注意 9.3.自定义sink 9.Sink 9.1.API 1.ds.print 直接输出到控制台 2.ds.printToErr()直接输出到控制台,用红色 3 ...
- 大数据存储项目-基于Flink的高速公路ETC入深圳数据实时分析平台
大数据项目总体流程分为以下4个方面: 数据采集,数据存储与管理,数据处理与分析,数据解释与可视化. 文章目录 数据源 项目要求 项目流程 1.数据爬取与清洗 2.jmeter模拟高并发数据流 3.ka ...
- flink实时消费kafka中oracle的DML数据写入mysql
1.需要环境 zookeeper,小编安装环境为zookeeper-3.4.10 kakfa,小编安装环境为kafka_2.13-2.8.0 kafka-connect-oracle,此为kafka- ...
- Flink - 尚硅谷- 大数据高级 Flink 技术精讲 - 2
七.Flink 时间语义与 Watermark 7.1 Flink 中的时间语义 7.2 设置 Event Time 7.3 水位线 - Watermark 7.3.1 基本概念 7.3.2 Wate ...
- 【3】flink sink
[README] 本文记录了flink sink操作,输出目的存储器(中间件)包括 kafka: es: db: 等等有很多: 本文只给出了 sink2kafka的代码: 本文使用的flink为 1. ...
- Iceberg 在基于 Flink 的流式数据入库场景中的应用
本文以流式数据入库的场景为基础,介绍引入 Iceberg 作为落地格式和嵌入 Flink sink 的收益,并分析了当前可实现的框架及要点. 应用场景 流式数据入库,是大数据和数据湖的典型应用场景.上 ...
- flink 复postgresql数据库数据
1对操作用户进行权限设置 详见下文pg创建流复制账号步骤2.然后通过命令或者利用代码进行数据库数据的复制 安装flink 实例为1.13.6: 下载版本对应jar包 https://mvnreposi ...
最新文章
- spring:《spring实战》读后感三
- Linux下开启/关闭MySql Server命令
- linux学习心得(一)
- React Fiber源码逐个击破系列-scheduler
- Linq 数据库操作(增删改查)
- php 303,HTTP 的重定向301,302,303,307(转)
- 使用 Syslog 连接 Sentinel
- cp命令显示进度条_干货|| Linux常用命令大全
- centos mysql 二进制_CentOS 7.6 安装二进制Mysql
- idea ssm html增删改,intellij idea采用ssm框架实现的数据库增删查改demo
- Java中abstract关键字详解
- ADI官方提供的源码AD9361+ZC706 利用TCL构建Vivado工程,利用no-OS-master搭建SDK工程
- 简单实用的php爬虫系统
- html css字体最小,css字体最小是多少?
- 免费好用的证件扫描仪-扫描全能王
- 怎么看神经网络过早收敛_你的神经网络真的收敛了么?
- hping3对某网站发起ddos攻击
- Cyclical Quest CF235C
- 经典同步时序逻辑电路分析汇总(第八道)(同步三进制计数器)
- Longest Common Subsequence