摘要:本文为 Flink 生产环境应用中的疑问剖析,Flink 无法实时写入 MySQL 是初学者常见问题之一,由社区同学罗鹏程提出,Apache Flink PMC 孙金城(金竹)老师分享该问题的解决方案及分析思路。主要分为以下四部分:

  1. 问题描述

  2. 解决思路

  3. 原因剖析

  4. 举一反三

Tips:更多生产环境问题交流及反馈请订阅 Flink 中文邮件列表

问题描述

Flink 1.10 使用 flink-jdbc 连接器的方式与 MySQL 交互,读数据和写数据都能完成,但是在写数据时,发现 Flink 程序执行完毕之后,才能在 MySQL 中查询到插入的数据。即,虽然是流计算,但却不能实时的输出计算结果?

相关代码片段:

JDBCAppendTableSink.builder().setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://localhost/flink").setUsername("root").setPassword("123456").setParameterTypes(BasicTypeInfo.INT_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO).setQuery("insert into batch_size values(?,?)").build()

如何解决?

Flink 1.10 这个问题是知道一秒钟,不知磨洋工的 Case,在初学时候非常容易遇上,那么真的是 Flink 不能实时写入 MySQL 吗?当然不是,上面代码基础之上简单的加上一行,就解决问题了:

...
.setBatchSize(1) //将写入MySQL的buffer大小为1。
..

原因剖析

那么问题虽然解决了,根本原因是个啥呢?也许你看到这里会说,这问题很明显,就是 Flink 设计 JDBC Sink 的时候出于性能因素考虑,对写入 buffer 做了默认值设置。

没错,这一点你说的很对,在 Flink 1.10 中 JDBC OutputFormat 的基类  AbstractJDBCOutputFormat 里面和这相关的变量 DEFAULT_FLUSH_MAX_SIZE 默认值是 5000,所以在你学习测试时候由于测试数据少(少于 5000),数据一直在 buffer 中,直到数据源数据结束,作业也结束了,才将计算结果刷入 MySQL,所以没有实时的(每条)写入 MySQL。如下:

但这里还有个因素需要注意,那就是时间因素,上面 DEFAULT_FLUSH_INTERVAL_MILLS 默认值是 0,这个相当于没有时间限制,一直等到 buffer 满了或者作业结束才能触发写出动作。

也就是有些初学者,发现问题,即使故意 debug 时候打上断点,不让作业结束,但是等到花儿都谢了,数据也没有写入到 MySQL。

在 Flink 1.10 中 AbstractJDBCOutputFormat 有两个实现类:

分别对应了如下两类 Sink:

所以在 Flink 1.10 中不论是 AppendTableSink 和 UpsertTableSink 都会有同样的问题。不过 UpsertTableSink 时用户可以设置时间,而 AppendTableSink 是连时间设置的入口都木有。

那么,是 Flink 的锅?

就这个问题而言,我个人认为不是用户的问题,是 Flink 1.10 代码设计有进一步改进的空间。在 Flink 1.11 中社区的确重构了,对 JDBCOutputFormat 打了  @Deprecated。感兴趣可以查阅 FLINK-17537 了解变化过程。但是在这个改进中,并没有对 DEFAULT_FLUSH_MAX_SIZE 默认值和 DEFAULT_FLUSH_INTERVAL_MILLS 默认值做变化,社区也在积极的讨论改进方案,想参与社区贡献或者了解最终讨论结果的可以查阅 FLINK-16497。

举一反三

当然在你学习过程中使用任何 Sink 的时候,只要没有实时写入,都可以找找是否有写出 buffer 和写出时间的限制设置。在这一点上,罗鹏程也提到了 Elasticsearch 也有类似问题,需要调用 setBulkFlushMaxActions 进行设置。

大家在学习、使用 Flink 的过程中遇到的问题都可以通过 Flink 中文邮件列表进行反馈,Flink 核心开发者及社区一线用户在线答疑交流!

2 分钟快速订阅 Flink 中文邮件列表

Apache Flink 中文邮件列表订阅流程:

  1. 发送任意邮件到 user-zh-subscribe@flink.apache.org

  2. 收到官方确认邮件

  3. 回复该邮件 confirm 即可订阅

订阅成功后将收到 Flink 官方的中文邮件列表的消息,您可以向 user-zh@flink.apache.org 发邮件提问也可以帮助别人解答问题,动动手测试一下!

以上是对该问题解决方案及思路的分享,希望能对你有所帮助,也期待大家遇到的典型问题能及时反馈至社区邮件列表。


关注 Flink 中文社区,获取更多技术干货

在看」吗?

为什么 Flink 无法实时写入 MySQL?相关推荐

  1. mysql 设置默认值_为什么 Flink 无法实时写入 MySQL?

    作者:孙金城 摘要:本文为 Flink 生产环境应用中的疑问剖析,Flink 无法实时写入 MySQL 是初学者常见问题之一,由社区同学罗鹏程提出,Apache Flink PMC 孙金城(金竹)老师 ...

  2. flink sink jdbc没有数据_No.2 为什么Flink无法实时写入MySQL?

    抛出疑无路? [Flink 1.10]- 使用flink-jdbc连接器的方式与MySQL交互,读数据和写数据都能完成,但是在写数据时,发现Flink程序执行完毕之后,才能在MySQL中查询到插入的数 ...

  3. Flink CDC 实时同步mysql

    前言 在实际开发中,需要做数据同步的场景是非常多的,比如不同的应用之间不想直接通过RPC的方式进行数据交互,或者说下游应用需要检测来自上游应用的某些业务指标数据的变化时,这些都可以考虑使用数据同步的方 ...

  4. flink实时写入数据到 clickhouse 报错Could not initialize class ru.yandex.clickhouse.ClickHouseUtil

    问题介绍: 项目在使用clickhouse jdbc 将flink数据实时写入clickhouse时候,报错: Caused by: java.lang.NoClassDefFoundError: C ...

  5. Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL

    上周六在深圳分享了<Flink SQL 1.9.0 技术内幕和最佳实践>,会后许多小伙伴对最后演示环节的 Demo 代码非常感兴趣,迫不及待地想尝试下,所以写了这篇文章分享下这份代码.希望 ...

  6. demo flink写入kafka_Flink结合Kafka实时写入Iceberg实践笔记

    前言 上文提到使用Flink SQL写入hadoop catalog 的iceberg table 的简单示例,这次我就flink 消费kafka 流式写入iceberg table做一个验证,现记录 ...

  7. 114.Spark大型电商项目-广告点击流量实时统计-使用高性能方式将实时计算结果写入MySQL中

    目录 误区 Spark Streaming foreachRDD的正确使用方式 对于这种实时计算程序的mysql插入,有两种pattern(模式) 代码 AdUserClickCount.java I ...

  8. Flink结合Kafka实时写入Iceberg实践笔记

    前言 上文提到使用Flink SQL写入hadoop catalog 的iceberg table 的简单示例,这次我就flink 消费kafka 流式写入iceberg table做一个验证,现记录 ...

  9. 日均百亿级日志处理:微博基于Flink的实时计算平台建设

    来自:DBAplus社群 作者介绍 吕永卫,微博广告资深数据开发工程师,实时数据项目组负责人. 黄鹏,微博广告实时数据开发工程师,负责法拉第实验平台数据开发.实时数据关联平台.实时算法特征数据计算.实 ...

  10. flink的实时数据同步

    构建基于flink.kafka.MySQL.hbase的实时数仓,实现: (1)业务数据全量同步到数据仓库: (2)业务数据实时增量同步到数据仓库,使用Kafka+canal实现增量数据采集. 数仓架 ...

最新文章

  1. centos安装JDK与Tomcat
  2. opengl载入多个3ds模型失败记
  3. linux vi 撤销重做于前进后退--转
  4. python xlsxwriter 画图_Python XlsxWriter模块Chart类用法实例分析
  5. java 下对字符串的格式化
  6. fitbit手表中文说明书_如何获取和分析Fitbit睡眠分数
  7. Docker安装java-Zookeeper进行操作
  8. 《图像超分》一些论文走读(SRCNN ,ESPCN ,VDSR ,SRGAN)
  9. JSLite 的目标:缩小体积,做到 jQuery-free
  10. 纯Git实现前端项目打包部署
  11. 计算机修改桌面图标大小,windows更改桌面图标大小设置
  12. prometheus安装与开启并设置开机自启
  13. 职称计算机初级应知应会书,职称计算机考试用书
  14. mysql存储爬虫图片_世纪佳缘信息爬取存储到mysql,下载图片到本地,从数据库选取账号对其发送消息更新发信状态...
  15. 介绍2款最流行的画PCB工具
  16. pptx文件怎么打开
  17. C++:实现量化默认概率曲线测试实例
  18. Java实现 LeetCode 373 查找和最小的K对数字
  19. Solaris 10u11 安装python2.7.10
  20. vrf名称_VRF中央空调中是什么意思

热门文章

  1. Cesium:修改点击显示的infoBox内容
  2. jQuery:设置body的背景
  3. Git:取消合并方式
  4. Cesium:向地图中添加线的方法
  5. 深度学习_pytorch_深度学习中的tensor介绍及常用操作
  6. 用AXIS2发布WebService的方法 使用eclipse插件生成服务端和客户端
  7. C语言union类型和C语言 uchar类型的个人见解
  8. 单目标跟踪paper小综述
  9. HTTP、Asp.net管道与IIS
  10. 【绝迹篇】RSA加密算法(私钥加签公钥验签)