文章目录

  • 一、MiniBatch的演进思路
    • 1、MiniBatch版本
    • 2、适用场景
    • 3、普通聚合与MiniBatch聚合对比
      • A、Simple Aggregation普通聚合
      • B、MiniBatch Aggregation微批聚合
  • 二、MiniBatch作用的SQL语句
    • 分类1、 window agg
    • 分类2、group agg
    • 分类3、over agg
  • 三、MiniBatch三类优化手段
    • 1、Local-Global聚合(本地-全局聚合)
    • 2、Partial-Final聚合(解决COUNT DISTINCT热点问题)
    • 3、Incremental增量聚合
  • 四、如何开启MiniBatch
    • 1、table.exec.mini-batch.enabled
    • 2、table.exec.mini-batch.allow-latency
    • 3、table.exec.mini-batch.size
    • 4、table.optimizer.reuse-sub-plan-enabled

一、MiniBatch的演进思路

1、MiniBatch版本

Flink 1.9.0 SQL(Blink Planner) 性能优化中一项重要的改进就是升级了微批模型,即 MiniBatch(也称作MicroBatch或MiniBatch2.0),在支持高吞吐场景发挥了重要作用。

MiniBatch与早期的MiniBatch1.0在微批的触发机制略有不同。原理同样是缓存一定的数据后再触发处理,以减少对State的访问,从而提升吞吐和减少数据的输出量

MiniBatch与早期的MiniBatch1.0对比如下:
1、MiniBatch1.0主要依靠在每个Task上注册的Timer线程来触发微批,需要消耗一定的线程调度性能。
2、MiniBatch是MiniBatch1.0的升级版,主要要基于事件消息来触发微批,事件消息会按您指定的时间间隔在源头插入。MiniBatch在元素序列化效率、反压表现、吞吐和延迟性能上都要优于胜于MiniBatch1.0

2、适用场景

微批处理是增加延迟来换取高吞吐的策略,如果您有超低延迟的要求,不建议开启微批处理。通常对于聚合的场景,微批处理可以显著的提升系统性能,建议开启。

3、普通聚合与MiniBatch聚合对比

A、Simple Aggregation普通聚合

在未开启任何聚合优化前,执行SQL():

SELECT key, SUM(value) FROM T GROUP BY key

当未开启 MicroBatch 时,Aggregate 的处理模式是每来一条数据,查询一次状态,进行聚合计算,然后写入一次状态。当有 4条数据时,需要操作 2*4 次状态

B、MiniBatch Aggregation微批聚合

当开启 MicroBatch 时,对于缓存下来的 N 条数据一起触发,同 key 的数据只会读写状态一次。例如下缓存的 4 条 A 的记录,只会对状态读写各一次。所以当数据的 key 的重复率越大,攒批的大小越大,那么对状态的访问会越少,得到的吞吐量越高。

二、MiniBatch作用的SQL语句

MiniBatch主要作用于聚合(Group By)语句中,且不带window的场景(即分类2)。
我们先看下聚合分类:

分类1、 window agg

示例:select count(a) from t group by tumble(ts, interval ’10’ second), b
解析:以10秒翻转窗口和字段b聚合,MiniBatch不能作用的场景

分类2、group agg

示例:select count(a) from t group by b
解析:以字段a聚合,MiniBatch可以作用的场景

分类3、over agg

示例:select count(a) over (partition by b order by c) from t
解析:over window,MiniBatch不能作用的场景

三、MiniBatch三类优化手段

上一章节我们说明了MiniBatch只能作用于分类2(group aggregate且不带window场景),这个聚合场景下,微批处理具有三类优化手段:

  • Local-Global聚合(本地-全局聚合)
  • Partial-Final聚合(解决COUNT DISTINCT热点问题)
  • Incremental增量聚合

1、Local-Global聚合(本地-全局聚合)

Local-Global聚合优化与Spark Structrued Streaming聚合思路类似:

  • 上游节点本地攒一批数据进行聚合(localAgg),并输出这次微批的增量值(Accumulator)
  • 第二阶段再将收到的Accumulator合并(merge),得到最终的结果(globalAgg)
  • 原理图解:
    LocalGlobal本质上能够靠localAgg的预聚合筛除部分倾斜数据,从而降低globalAgg的热点,提升性能。可以结合下图理解LocalGlobal如何解决数据倾斜的问题
  • 适用场景:
    LocalGlobal适用于提升如SUM、COUNT、MAX、MIN和AVG等普通聚合的性能,能提高算子吞吐量,也能有效解决常见数据热点问题

  • 源码Rule规则:TwoStageOptimizedAggregateRule(两阶段聚合规则)
  • 物理计划算子:上述规则内部#createTwoStageAgg()创建了StreamExecLocalGroupAggregate、StreamExecGlobalGroupAggregate物理计划节点,分别对应Local、Global微批聚合实现
  • 实现函数Fuction:MiniBatchLocalGroupAggFunction、MiniBatchGlobalGroupAggFunction,分别对应Local、Global微批聚合实现
  • 需要的额外配置:
    table.optimizer.agg-phase-strategy开启(默认值已为AUTO开启,所以不用配置)
  • 如何判断是否生效:
    FLink Web UI观察最终生成的拓扑图的节点名字中是否包含GlobalGroupAggregate或LocalGroupAggregate

2、Partial-Final聚合(解决COUNT DISTINCT热点问题)

上一小节的Local-Global优化能针对常见普通聚合有较好的效果(如SUM、COUNT、MAX、MIN和AVG)。但是对于COUNT DISTINCT收效不明显,原因是COUNT DISTINCT在local聚合时,对于DISTINCT KEY的去重率不高,导致在Global节点仍然存在热点

实时计算历史版本中,用户为了解决COUNT DISTINCT的热点问题,通常会手动改写成两层聚合(增加按distinct key取模的打散层),自FLink1.9.0版本开始,实时计算提供了COUNT DISTINCT自动打散,即Partial-Final优化,您无需自行改写为两层聚合。Partial-Final和Local-Global的原理对比参见下图。

  • 适用场景:
    使用COUNT DISTINCT且聚合节点性能无法满足时。
  • 说明:
    PartialFinal优化方法不能在包含UDAF的Flink SQL中使用。
    数据量不大的情况下不建议使用PartialFinal优化方法。PartialFinal优化会自动打散成两层聚合,引入额外的网络Shuffle,在数据量不大的情况下,可能反而会浪费资源。


一个Partial-Final优化过程示例:

原SQL:
SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,

对所需DISTINCT字段buy_id模1024自动打散后,SQL:
SELECT day, SUM(cnt) total
FROM (
SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
FROM T GROUP BY day, MOD(buy_id, 1024))
GROUP BY day

  • 源码Rule规则:SplitAggregateRule(拆分聚合规则,注意是作用于logical逻辑计划阶段,拆分出来的两个聚合GROUP还会参与local-global等优化)
  • 需要的额外配置:
    table.optimizer.distinct-agg.split.enabled开启(默认值已为false,需要设置为true)
    table.optimizer.distinct-agg.split.bucket-num(默认值1024,可以根据业务数据量和热点情况,设置这个取模值)
  • 如何判断是否生效:
    FLink Web UI观察最终生成的拓扑图的节点名中是否包含Expand节点,或者原来一层的聚合变成了两层的聚合

3、Incremental增量聚合

增量聚合是对partial-final和local-global拆分出来的聚合物理算子进行进一步优化,例如对一个带有COUNT DISTINCT和聚合场景,同时开启partial-final和local-global优化配置,最后会得到4个以上相关算子。

如果上一个聚合算子的输出字段(partial)与下一个聚合算子(local)的输入字段一样,就可以匹配上IncrementalAggregateRule,进行算子的合并。如图:

  • 源码Rule规则:IncrementalAggregateRule(增量聚合规则,注意是作用于physical物理计划阶段)
  • 物理计划算子:上述规则内部#onMatch()创建了StreamExecIncrementalGroupAggregate、StreamExecGlobalGroupAggregate物理计划节点,分别对应Local、Global微批聚合实现
  • 实现函数Fuction:MiniBatchGlobalGroupAggFunction
  • 需要的额外配置:
    table.optimizer.incremental-agg-enabled开启(默认值已为true,所以不用修改)
  • 如何判断是否生效:
    FLink Web UI观察最终生成的拓扑图的节点名中是否包含IncrementalGroup节点

四、如何开启MiniBatch

1、table.exec.mini-batch.enabled

 public static final ConfigOption<Boolean> TABLE_EXEC_MINIBATCH_ENABLED =key("table.exec.mini-batch.enabled").defaultValue(false).withDescription("Specifies whether to enable MiniBatch optimization. " +"MiniBatch is an optimization to buffer input records to reduce state access. " +"This is disabled by default. To enable this, users should set this config to true. " +"NOTE: If mini-batch is enabled, 'table.exec.mini-batch.allow-latency' and " +"'table.exec.mini-batch.size' must be set.");
  • 解析:MiniBatch开关配置,默认为false关闭,优化时需要设置为true
  • 作用的物理计划算子:
    ①、StreamExecGroupAggregate(普通group by聚合,且未开启local-global本地至全局两阶段聚合优化,对应的物理计划算子,后续讲解)
    ②、StreamExecGlobalGroupAggregate(普通group by聚合,开启了local-global本地至全局两阶段聚合优化,对应的物理计划算子)

2、table.exec.mini-batch.allow-latency

 public static final ConfigOption<String> TABLE_EXEC_MINIBATCH_ALLOW_LATENCY =key("table.exec.mini-batch.allow-latency").defaultValue("-1 ms").withDescription("The maximum latency can be used for MiniBatch to buffer input records. " +"MiniBatch is an optimization to buffer input records to reduce state access. " +"MiniBatch is triggered with the allowed latency interval and when the maximum number of buffered records reached. " +"NOTE: If " + TABLE_EXEC_MINIBATCH_ENABLED.key() + " is set true, its value must be greater than zero.");
  • 解析:MiniBatch缓存数据最大的时间间隔,超过这个间隔,强制触发已聚合数据写出给下游,默认-1毫秒,可以根据需求和业务容忍的延迟,调整为5000毫秒等
  • 作用的物理计划算子:StreamExecWatermarkAssigner(水印SQL物理计划),内部可以创建MiniBatchAssignerOperator,以上述配置的周期,将当前Watermark发送给下游,触发计算和写出

3、table.exec.mini-batch.size

public static final ConfigOption<Long> TABLE_EXEC_MINIBATCH_SIZE =key("table.exec.mini-batch.size").defaultValue(-1L).withDescription("The maximum number of input records can be buffered for MiniBatch. " +"MiniBatch is an optimization to buffer input records to reduce state access. " +"MiniBatch is triggered with the allowed latency interval and when the maximum number of buffered records reached. " +"NOTE: MiniBatch only works for non-windowed aggregations currently. If " + TABLE_EXEC_MINIBATCH_ENABLED.key() +" is set true, its value must be positive.");
  • 解析:MiniBatch缓存数据最大数目,超过这个数目,强制触发已聚合数据写出给下游,默认-1,可以根据需求和业务每秒数据量,调整为需要的值,例如50000
  • 作用的物理计划算子:AggregateUtil中创建CountBundleTrigger(以数目为阈值的触发器,实现较为简单),其#onElement()方法会调用AbstractMapBundleOperator#finishBundle()结束一段聚合缓存,为核心逻辑,本文后面章节分析
  • 注:table.exec.mini-batch.size与上一节table.exec.mini-batch.allow-latency为或关系,达到阈值触发聚合写出给下游

4、table.optimizer.reuse-sub-plan-enabled

 public static final ConfigOption<Boolean> TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED =key("table.optimizer.reuse-sub-plan-enabled").defaultValue(true).withDescription("When it is true, the optimizer will try to find out duplicated sub-plans and reuse them.");
  • 解析:复用子查询,这个配置是一个通用的配置,并不只是作用于聚合SQL,建议开启,如果两个SQL语句的from表及project(select)后字段一模一样,就可以将两个逻辑节点合并为一个
  • 示例:
    Scan1、Scan2 与是统一个表名, Project1与Project2字段一样,Filter1与Filter2逻辑有区别,可以使用reuse-sub-plan这个优化
         Join                      Join/      \                  /      \Filter1  Filter2          Filter1  Filter2|        |        =>       \     /Project1 Project2            Project1|        |                   |Scan1    Scan2               Scan1
  • 作用的物理计划算子:实现逻辑在SubplanReuser,感兴趣的读者可以深入分析

FLink聚合性能优化--MiniBatch分析相关推荐

  1. 【Elasticsearch】es Elasticsearch 聚合性能优化六大猛招

    1.概述 参考:Elasticsearch 聚合性能优化六大猛招

  2. 转:Android应用开发性能优化完全分析

    转自:http://blog.csdn.net/yanbober/article/details/48394201 1 背景 其实有点不想写这篇文章的,但是又想写,有些矛盾.不想写的原因是随便上网一搜 ...

  3. 【安卓开发系列 -- APP 】APP 性能优化 -- 崩溃分析

    [安卓开发系列 -- APP ]APP 性能优化 -- 崩溃分析 [1]Native Crash 分析示例 [1.1]Linux 编译 breadpad 下载 breadpad 源码 git clon ...

  4. Android应用开发性能优化完全分析

    1 背景 其实有点不想写这篇文章的,但是又想写,有些矛盾.不想写的原因是随便上网一搜一堆关于性能的建议,感觉大家你一总结.我一总结的都说到了很多优化注意事项,但是看过这些文章后大多数存在一个问题就是只 ...

  5. Spark的性能优化案例分析(下)

    前言 Spark的性能优化案例分析(上),介绍了软件性能优化必须经过进行性能测试,并在了解软件架构和技术的基础上进行.今天,我们通过几个 Spark 性能优化的案例,看一看所讲的性能优化原则如何落地. ...

  6. Elasticsearch 聚合性能优化六大猛招

    Elasticsearch 最少必要知识实战教程直播回放 1.问题引出 默认情况下,Elasticsearch 已针对大多数用例进行了优化,确保在写入性能和查询性能之间取得平衡.我们将介绍一些聚合性能 ...

  7. Flink SQL 性能优化

    文章目录 MiniBatch 聚合 Local-Global 聚合 拆分 distinct 聚合 在 distinct 聚合上使用 FILTER 修饰符 关注我的公众号[宝哥大数据],更多干货 SQL ...

  8. ELK性能优化实战分析

    ###推荐阅读:https://www.jianshu.com/p/e51ba6866b84 一.背景介绍 近一年内对公司的 ELK 日志系统做过性能优化,也对 SkyWalking 使用的 ES 存 ...

  9. SQL性能优化案例分析

    这段时间做一个SQL性能优化的案例分析, 整理了一下过往的案例,发现一个比较有意思的,拿出来给大家分享. 这个项目是我在项目开展2期的时候才加入的, 之前一期是个金融内部信息门户, 里面有个功能是收集 ...

最新文章

  1. Asp.Net中MVC缓存详解
  2. opencl高斯源码整理
  3. Android模拟器无法上网问题
  4. java 当一个文本框有值时另一个文本框置灰_【农行DevOps进行时】基于PaaS的持续集成/持续交付实践 | IDCF...
  5. 15 错误边界与使用技巧
  6. EevExpress中XtraGrid常用方法
  7. springBoot修改代码不需要重启-热部署
  8. kali如何取得超级用户权限_微商如何取得好的口碑?好的口碑等于信任微商如何提高用户信任?...
  9. ajax发送html标识,如何在jQuery的.ajax函数中正确转义作为数据发送的html
  10. opencv车牌照识别
  11. php 连接芒果数据库,PHP MongoDB示例
  12. 如何创建自己的apt软件源
  13. 自由网络-去中心化分布式网络
  14. mysql 计算成功率_mysql数据统计技巧备忘录
  15. 微信小程序开发01 双线程模型:为什么小程序不用浏览器的线程模型?
  16. CSS样式自动换行(强制换行)
  17. Zhong__Docker安装和简单使用
  18. 渝粤题库 陕西师范大学 《教育法学》作业
  19. 15. 简单工资管理系统设计
  20. 关于setInterval只执行一次的原因

热门文章

  1. IOS开发学习之路--第一篇--TOM猫
  2. 国家普通话智能测试软件,国家普通话水平智能测试系统
  3. 联想记忆计算机网络,六种联想记忆方法详解
  4. ajax仿百度搜索效果,利用autocomplete.js实现仿百度搜索效果(ajax动态获取后端[C#]数据)...
  5. Linux SDIO WIFI Marvell8801/Marvell88w8801(三) --- Linux驱动以及组件的使用
  6. 千亿市值今天解禁 美团点评“心里没谱”
  7. 51单片机的直流电机PWM调速控制系统(附Proteus仿真+C程序等全套资料)
  8. Java语言实现小学数学练习
  9. c语言编写音乐播放器完整代码(mciSendString函数的使用方法,第一次使用Visual Studio 2019的详细步骤)
  10. 淘宝视频的跨模态检索