抛出疑无路?

【Flink 1.10】- 使用flink-jdbc连接器的方式与MySQL交互,读数据和写数据都能完成,但是在写数据时,发现Flink程序执行完毕之后,才能在MySQL中查询到插入的数据。即,虽然是流计算,但却不能实时的输出计算结果?From @罗鹏程

相关代码片段:
JDBCAppendTableSink.builder()    .setDrivername("com.mysql.jdbc.Driver")    .setDBUrl("jdbc:mysql://localhost/flink")    .setUsername("root")    .setPassword("123456")    .setParameterTypes(    BasicTypeInfo.INT_TYPE_INFO,    BasicTypeInfo.STRING_TYPE_INFO)    .setQuery("insert into batch_size values(?,?)")   .build()

再现又一村!

【Flink-1.10】这个问题是知道一秒钟,不知磨洋工的Case,在初学时候非常容易遇上,那么真的是Flink不能实时写入MySQL吗?当然不是,上面代码基础之上简单的加上一行,就解决问题了:

....setBatchSize(1) //将写入MySQL的buffer大小为1。..

向前一小步...

那么问题虽然解决了,根本原因是个啥呢?也许你看到这里会说,这问题很明显,就是Flink设计JDBC Sink的时候出于性能因素考虑,对写入buffer做了默认值设置,没错,这一点你说的很对,在【Flink-1.10】中JDBC OutputFormat的基类 AbstractJDBCOutputFormat里面和这相关的变量 DEFAULT_FLUSH_MAX_SIZE 默认值是5000,所以在你学习测试时候由于测试数据少(少于5000),数据一直在buffer中,直到数据源数据结束,作业也结束了,才将计算结果刷入MySQL,所以没有实时的(每条)写入MySQL。如下:

但这里还有个因素需要注意,那就是时间因素,上面DEFAULT_FLUSH_INTERVAL_MILLS默认值是0,这个相当于没有时间限制,一直等到buffer满了或者作业结束才能触发写出动作。也就是有些初学者,发现问题,即使故意 debug时候打上断点,不让作业结束,但是等到花儿都谢了,数据也没有写入到MySQL。

在【Flink 1.10】中AbstractJDBCOutputFormat有两个实现类,

分别对应了如下两类Sink:

所以在【Flink 1.10】中不论是 AppendTableSink和UpsertTableSink都会有同样的问题。不过UpsertTableSink时候用户可以设置时间,而AppendTableSink是连时间设置的入口都木有。

Flink 的锅?...

就这个问题而言,我个人认为不是用户的问题,是Flink 1.10代码设计有进一步改进的空间。在Flink 1.11 中社区的确重构了,对JDBCOutputFormat 打了 @Deprecated。感兴趣可以查阅 FLINK-17537 了解变化过程。但是在这个改进中,并没有对  DEFAULT_FLUSH_MAX_SIZE 默认值和 DEFAULT_FLUSH_INTERVAL_MILLS 默认值做变化,社区也在积极的讨论改进方案,想参与社区贡献或者了解最终讨论结果的可以查阅 FLINK-16497。

举一反三

当然在你学习过程中使用任何Sink的时候,只要没有实时写入,都可以找找是否有写出buffer和写出时间的限制设置。在这一点上,罗鹏程 也提到了Elasticsearch也有类似问题,需要调用setBulkFlushMaxActions进行设置。

众人拾柴

期待你典型问题的抛出... 我将知无不言...言无不尽... 我在又一村等你...

关注我的订阅号,我们并肩续航...

flink sink jdbc没有数据_No.2 为什么Flink无法实时写入MySQL?相关推荐

  1. mysql 设置默认值_为什么 Flink 无法实时写入 MySQL?

    作者:孙金城 摘要:本文为 Flink 生产环境应用中的疑问剖析,Flink 无法实时写入 MySQL 是初学者常见问题之一,由社区同学罗鹏程提出,Apache Flink PMC 孙金城(金竹)老师 ...

  2. Flink使用JdbcSink下沉数据到数据库MySQL

    Flink官网提供了JdbcSink的功能,如下: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEn ...

  3. 9.FLINK Sink\API\自定义sink

    9.Sink 9.1.API 9.2.注意 9.3.自定义sink 9.Sink 9.1.API 1.ds.print 直接输出到控制台 2.ds.printToErr()直接输出到控制台,用红色 3 ...

  4. 大数据存储项目-基于Flink的高速公路ETC入深圳数据实时分析平台

    大数据项目总体流程分为以下4个方面: 数据采集,数据存储与管理,数据处理与分析,数据解释与可视化. 文章目录 数据源 项目要求 项目流程 1.数据爬取与清洗 2.jmeter模拟高并发数据流 3.ka ...

  5. flink实时消费kafka中oracle的DML数据写入mysql

    1.需要环境 zookeeper,小编安装环境为zookeeper-3.4.10 kakfa,小编安装环境为kafka_2.13-2.8.0 kafka-connect-oracle,此为kafka- ...

  6. Flink - 尚硅谷- 大数据高级 Flink 技术精讲 - 2

    七.Flink 时间语义与 Watermark 7.1 Flink 中的时间语义 7.2 设置 Event Time 7.3 水位线 - Watermark 7.3.1 基本概念 7.3.2 Wate ...

  7. 【3】flink sink

    [README] 本文记录了flink sink操作,输出目的存储器(中间件)包括 kafka: es: db: 等等有很多: 本文只给出了 sink2kafka的代码: 本文使用的flink为 1. ...

  8. Iceberg 在基于 Flink 的流式数据入库场景中的应用

    本文以流式数据入库的场景为基础,介绍引入 Iceberg 作为落地格式和嵌入 Flink sink 的收益,并分析了当前可实现的框架及要点. 应用场景 流式数据入库,是大数据和数据湖的典型应用场景.上 ...

  9. flink 复postgresql数据库数据

    1对操作用户进行权限设置 详见下文pg创建流复制账号步骤2.然后通过命令或者利用代码进行数据库数据的复制 安装flink 实例为1.13.6: 下载版本对应jar包 https://mvnreposi ...

最新文章

  1. spring:《spring实战》读后感三
  2. Linux下开启/关闭MySql Server命令
  3. linux学习心得(一)
  4. React Fiber源码逐个击破系列-scheduler
  5. Linq 数据库操作(增删改查)
  6. php 303,HTTP 的重定向301,302,303,307(转)
  7. 使用 Syslog 连接 Sentinel
  8. cp命令显示进度条_干货|| Linux常用命令大全
  9. centos mysql 二进制_CentOS 7.6 安装二进制Mysql
  10. idea ssm html增删改,intellij idea采用ssm框架实现的数据库增删查改demo
  11. Java中abstract关键字详解
  12. ADI官方提供的源码AD9361+ZC706 利用TCL构建Vivado工程,利用no-OS-master搭建SDK工程
  13. 简单实用的php爬虫系统
  14. html css字体最小,css字体最小是多少?
  15. 免费好用的证件扫描仪-扫描全能王
  16. 怎么看神经网络过早收敛_你的神经网络真的收敛了么?
  17. hping3对某网站发起ddos攻击
  18. Cyclical Quest CF235C
  19. 经典同步时序逻辑电路分析汇总(第八道)(同步三进制计数器)
  20. Longest Common Subsequence

热门文章

  1. 创建对象的多种方式以及优缺点
  2. 默认权限umask、文件系统权限、特殊权限
  3. iOS系统预览文件但不分享的实用技巧 (iOS10, QLPreviewController)
  4. 批量修改一张表格的多个sheet名
  5. nullnullProcessing Bitmaps Off the UI Thread 处理来自UI线程的位图
  6. 盘点我们最容易误解的30个英语句子
  7. Android深入浅出系列之Android开发环境搭建—SDK(三)
  8. 安装oracle时的x问题
  9. U盘量产及在虚拟机中测试
  10. 熊猫烧香病毒背后的***社会