背景

开发Flink应用要求计算结果实时写入数据库的,一般业务写入TPS在600-800,如果生产同时跑十几个任务,数据库写入TPS接近一万,对数据库造成了较大压力,使用窗口的优化方向不可行:

1. 计算任务的key值较为分散(如用户,商户维度),小窗口(1分钟、5分钟)计算无法减少写入次数,大窗口(10分钟、1小时)实时性太差;

2. 无法保证上游流水100%有序准时到达,使用窗口计算容易漏算流水;


优化思路

使用Flink应用对流水进行计算和统计,结果一般字段较少,每条计算结果的数据量不大,如果开启批量写入,可以降低和数据库之间的网络交换次数,也可提升数据库的数据写入效率;


数据库连接配置

  • rewriteBatchedStatements

配置此参数为true以后,jdbc驱动会在executeBatch时将SQL改写,将多条Insert语句合并为一条,效果如下:

改写前:

INSERT INTO `t` (`a`) VALUES(10);
INSERT INTO `t` (`a`) VALUES(11);
INSERT INTO `t` (`a`) VALUES(12);

改写后:

INSERT INTO `t` (`a`) VALUES(10),(11),(12);
  • useServerPrepStmts

这个参数是用来节省数据库SQL解析和执行计划消耗的,开启useServerPrepStmts以后,服务器可以将已经解析的SQL反复使用,只在每次客户端提交新的写入请求时填入'?'占位符的参数;

值得注意的是,useServerPrepStmts开启后,客户端不再对SQL预编译,也就是说上面说的SQL改写也不会发生,所以在批量写入时,把此参数关闭,或保持默认配置;

关于rewriteBatchedStatements和useServerPrepStmts配合使用的性能测试,参考:

MySql PreparedStatement executeBatch过慢问题 - 灰信网(软件开发博客聚合)

  • cachePrepStmts

虽然 useServerPrepStmts = true 能让服务端执行预处理语句,但默认情况下客户端每次执行完后会 close 预处理语句,并不会复用,这样预处理的效率甚至不如文本执行。所以建议开启 useServerPrepStmts = true 后同时配置 cachePrepStmts = true,这会让客户端缓存预处理语句。

  • prepStmtCacheSqlLimit

在配置 cachePrepStmts 后还需要注意 prepStmtCacheSqlLimit 配置(默认为 256),该配置控制客户端缓存预处理语句的最大长度,超过该长度将不会被缓存。

在一些场景 SQL 的长度可能超过该配置,导致预处理 SQL 不能复用,建议根据应用 SQL 长度情况决定是否需要调大该值。

  • prepStmtCacheSqlLimit

控制缓存的预处理语句数目(默认为 25),如果应用需要预处理的 SQL 种类很多且希望复用预处理语句,可以调大该值。


配置参考

jdbc:mysql://localhost:3306/mydb?charset=utf8mb4&useSSL=false&useConfigs=maxPerformance& cachePrepStmts=true&prepStmtCacheSqlLimit=8192&prepStmtCacheSize=1024&rewriteBatchedStatements=true&allowMultiQueries=true

SQL改写

原先SQL是这么写的:

INSERT INTO  rts_clear_branch_day_trans(stl_branch_id,stl_org_id,time_date,trans_count,trans_amount,mer_fee,acq_shr_amt)
VALUES(?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATEtrans_count = ?,trans_amount = ?,mer_fee = ? ,acq_shr_amt = ?

改写后:

INSERT INTO  rts_clear_branch_day_trans(stl_branch_id,stl_org_id,time_date,trans_count,trans_amount,mer_fee,acq_shr_amt)
VALUES(?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATEtrans_count = VALUES(`trans_count`),trans_amount = VALUES(`trans_amount`),mer_fee = VALUES(`mer_fee`),acq_shr_amt = VALUES(`acq_shr_amt`)

Batch配置

new JdbcExecutionOptions.Builder().withBatchSize(100).withBatchIntervalMs(1000).withMaxRetries(3).build(),

【Flink系列】开启jdbc批量写入相关推荐

  1. 《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL

    <!-- more --> 前言 之前其实在 <从0到1学习Flink>-- 如何自定义 Data Sink ? 文章中其实已经写了点将数据写入到 MySQL,但是一些配置化的 ...

  2. 【SpringBoot DB系列】Jooq批量写入采坑记录

    [SpringBoot DB系列]Jooq批量写入采坑记录 前面介绍了jooq的三种批量插入方式,结果最近发现这里面居然还有一个深坑,我以为的批量插入居然不是一次插入多条数据,而是一条一条的插入-,这 ...

  3. 写入mysql_《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL

    前言 之前其实在 <从0到1学习Flink>-- 如何自定义 Data Sink ? 文章中其实已经写了点将数据写入到 MySQL,但是一些配置化的东西当时是写死的,不能够通用,最近知识星 ...

  4. flink批量写入oracle,批量写入目标表存在重复写入问题

    测试场景 源表字符串类型数据存储到目标表整型数据类型,批量写入,发现有重复写入目标表问题 测试的时候发现:当批量写入抛出异常的时候,产生2个结果,1.写入部分数据 2.程序转到处理异常的代码块,在异常 ...

  5. Flink窗口+触发器 ,实现定时、定量批量写入Hbase不同的表

    需求案例 消费kafka中的数据,根据业务类型不同批量写入不同的Hbase表. 按照数据延迟和单次写入数据量要求,写入库的时候采用两种模式: 定时:满足指定数据延迟 定量:满足指定数据量 技术点解析 ...

  6. Flink 读取 Kafka 消息并批量写入到 MySQL实例

    前言说明 环境搭建可参考 <kafka+flink集成实例> 本实例主要实现功能如下: 模拟消息生成->Kafka->Flink->Mysql 其中Flink做数据流收集 ...

  7. 【clickhouse】flink jdbc 方式写入 clickhouse 报错 request to {}->http://xxx:8123: Broken pipe

    文章目录 1.概述 本文为博主九师兄(QQ:541711153 欢迎来探讨技术)原创文章,未经允许博主不允许转载.可以加我问问题,免费解答,有问题可以先私聊我,本人每天都在线,会帮助需要的人. 但是本 ...

  8. mybatis入门学习(九) -DB环境切换、使用注解、事务提交、获取自增ID、多参数传参、鉴别器、内置参数、批量写入、Oracle字段无法插入 null 值

    一.多数据库环境切换 1.config配置: <!-- default="mydemo" 指定连接的数据库 --><environments default=&q ...

  9. 批量写入tidb提高写入效率

    在url中增加allowMultiQueries=true&rewriteBatchedStatements=true&useConfigs=maxPerformance&us ...

最新文章

  1. 构建之法第十,十一,十二章阅读
  2. javase基础socket编程之局域网聊天,局域网文件共享
  3. 【深度学习】深度学习入门——BP网络反向传播
  4. 数据治理是开展数据分析的前提
  5. 两台计算机通过路由器连接网络,如何设置将两台计算机连接到Internet的路由器...
  6. 如何让地面不起灰_水泥地面起灰怎么办?
  7. ckeditor 部分保存按钮去除_怎样去除mp4视频中的水印呢
  8. 新萝卜家园 GhostXP SP3 一键极速装机版 2012.05
  9. Kali linux 2016.2(Rolling)中的Exploits模块详解
  10. java封装继承多态实验总结_java封装继承多态理解3000字论文
  11. android手势解锁代码,纯android代码实现九宫格手势密码
  12. java 柱状图 宽度_Java 创建Excel图表
  13. Ubuntu deb文件 安装 MySQL
  14. MinIO客户端mc使用
  15. JAVA泛型通配符T,E,K,V区别,T以及ClassT,ClassT的区别
  16. 一级计算机考试计算,全国计算机一级MS OFFICE等级考试计算题(3)
  17. 【UOJ#228】 基础数据结构练习题
  18. Python中os.listdir() 函数用法及实例
  19. 视频编解码 — 码控算法
  20. 大学计算机课实验,大学计算机课程实验教学平台的设计与实现

热门文章

  1. 初夏小谈:QZ聊天系统大体框架(一)
  2. windows10 设置虚拟网卡/ip
  3. 计算机设备编号中字母代号对照,计算机项目符号和编号
  4. java日期减天数_使用Java中的Calendar.DATE从当前日期减去天数
  5. 是不是年龄大的人就不能找到一份好工作?
  6. 2.03_01_Python网络爬虫urllib2库
  7. GRE over IPSec vs IPSec over GRE
  8. 新库上线 | CnOpenData舆情云数据
  9. python字符 * 和 ** 的拆包(打散)功能
  10. 使用IntentService解决点我达骑手APP消息提醒机制中的优先级排序问题