微博机器学习平台使用 Flink 实时处理用户行为日志和生成标签,并且在生成标签后写入存储系统。为了降低存储系统的 IO 负载,有批量写入的需求,同时对数据延迟也需要进行一定的控制,因此需要一种有效的消息聚合处理方案。

在本篇文章中我们将详细介绍 Flink 中对消息进行聚合处理的方案,描述不同方案中可能遇到的问题和解决方法,并进行对比。

基于 flatMap 的解决方案

这是我们能够想到最直观的解决方案,即在自定义的 flatMap 方法中对消息进行聚合,伪代码如下:

对应的作业拓扑和运行状态如下:


该方案的优点如下:

  1. 逻辑简单直观,各并发间负载均匀。
  2. flatMap 可以和上游算子 chain 到一起,减少网络传输开销。
  3. 使用 operator state 完成 checkpoint,支持正常和改并发恢复。

与此同时,由于使用 operator state,因此所有数据都保存在 JVM 堆上,当数据量较大时有 GC/OOM 风险。

使用 Count Window 的解决方案

对于大规模 state 数据,Flink 推荐使用 RocksDB backend,并且只支持在 KeyedStream 上使用。与此同时,KeyedStream 支持通过 Count Window 来实现消息聚合,因此 Count Window 成为第二个可选方案。

由于需要使用 KeyedStream,我们面临的第一个问题就是如何生成 key。一个比较自然的想法是直接使用随机数,伪代码示例如下:

对应的作业拓扑如下:

然而实际上线测试时出现了数据倾斜,不同并发间会出现负载不均,部分 task 接收不到数据从而 TPS 为 0:

在我们的场景下,除了有批量写入降低 IO 的需求,对数据延迟也需要控制,当 key set 太大时,每个 key 累积指定数据条数的时间将增加,会导致数据写入的延迟增大,因此我们需要控制 key set 的大小。经过分析,当 key set 较小时,Flink 默认的数据分发策略在并发间分布不均,从而导致了上述数据倾斜的问题。下面我们从源码级别对此进行说明。

首先,Flink 为了保证从 state 中恢复数据时产生最小的 IO,引入了 key group 的概念。Key group 数目等于最大并发数(max parallelism),取值范围是 128-32768。当做数据分发的时候,key 会按照规则被分发到 key group 里面,相关代码如下所示:

keyGroup->KeyGroupRangeAssignment.assignToKeyGroup(key,maxParallelism);

然后,key group 会按照规则被分发到每个 task 上,代码示例如下:

Task->String.valueOf(KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(maxParallelism, parallelism, keyGroup));

通过 debug 可以发现,当 key 的数量较小时,该分发策略会导致 key 在 task 之间分配不均匀,测试代码如下:

输出结果如下:

{0=4, 1=4, 2=1}
{0=651, 1=686, 2=710}

可以看到,当只有 10 个 key 时,并发间分布很不均匀;但当 key 的数量增加到 2048 时,就相对均匀了。

在了解了 key 的分发策略之后,我们可以相应的调整 key 的生成规则,来达到指定并发度和 key set 大小前提下的数据均匀,如下述代码所示:


我们利用 maxParallelism 和 parallelism 生成 key,并将其存储到一个大小为 parallelism 的 map 里,以 taskid 作为 map key ,每个 task 对应的 key list 作为 value,来保证每个 taskid 对应的 list 都存储了相同数量的 key。

最后,再将 map 打平,存储到一个数组里。在使用的时候,我们可以从该数组里随机取数来作为key,就能达到平均分配的目的了。

该方案的执行效果如下:

可以看到数据倾斜的问题得以解决,每个任务的负载都比较均匀。但需要注意的是由于引入了 key by,因此会有数据 shuffle,对比 flatmap 方案会有额外的网络开销。另外由于生成 key 的规则和实际并发度有关,因此该方案不支持改并发恢复,或者说如果修改并发,那么在 restore 的时候会发生数据错乱的问题,这一点需要尤为注意。

方案对比和总结

最后我们将两种解决方案的优缺点对比总结如下:

在数据量不大且内存充足的情况下,建议使用 flatmap 方案;在数据量较大且可以保证不修改并发的情况下,建议使用 count window 方案并使用 RocksDB 进行 state数据存储;在数据量较大且需要修改并发的情况下,当前给出的两种方案都无法解决,需要寻求新的解决方案。

作者介绍:

曹富强、张颖,微博机器学习研发中心-系统工程师。现负责微博机器学习平台数据计算模块,主要涉及实时计算 Flink、Storm、Spark Streaming,离线计算 Hive、Spark 等。目前专注于 Flink 在微博机器学习场景的应用。

原文链接
本文为云栖社区原创内容,未经允许不得转载。

Flink 消息聚合处理方案相关推荐

  1. 如何基于RocketMQ设计一套全链路消息不丢失方案?

    我们使用MQ作为消息中间件,传输一些消息的时候,必须考虑到消息丢失的可能.因为有的时候消息丢失了,会产生很严重的后果,比如消息计费数据,跟钱有关的消息. 这篇文章我们以RocketMQ为例来讲解,如何 ...

  2. 分布式事务——消息最终一致性方案

    前言 分布式事务一直是服务化拆分后一个绕不开的话题,原来在单体应用中执行的多个逻辑操作,现在被拆分成了多个服务之间的远程调用.虽然服务化为我们的系统带来了水平伸缩的能力,然而随之而来挑战就是分布式事务 ...

  3. nosql简答什么是最终一致性_可靠消息最终一致性方案中预发送作用是什么

    可靠消息最终一致性方案的核心流程 ①上游服务投递消息 如果要实现可靠消息最终一致性方案,一般你可以自己写一个可靠消息服务,实现一些业务逻辑. 首先,上游服务需要发送一条消息给可靠消息服务.这条消息说白 ...

  4. Rabbitmq消息中心_消息中心总体方案

    消息中心方案 一.消息中心简介 为了将各个应用系统之间进行业务解耦,对业务的透明化处理及技术架构的统一管理,方便对各应用的整体把控,保证系统的稳定性,也方便各应用的消息中间件的快速搭建,因此搭建消息中 ...

  5. 密码学归约证明——基于伪随机函数的消息鉴别码方案

    1. 消息鉴别码实验 运行得到密钥,即. 敌手获得输入,且能够访问预言机,最终输出,其中为消息的鉴别码.设为敌手访问预言机的问询集合. 当且仅当且,实验输出1.其中是鉴别码方案的验证方法. 如果一个消 ...

  6. flink on k8s部署方案实践--详细步骤

    背景 Flink-operator极大的方便了我们管理 Flink 集群及其作业,我们只需要自定义yaml文件就可以做到. Flink 官方还未给出 flink-operator 方案,不过 Goog ...

  7. 延时消息常见实现方案

    前言 延时消息(定时消息)指的在分布式异步消息场景下,生产端发送一条消息,希望在指定延时或者指定时间点被消费端消费到,而不是立刻被消费. 延时消息适用的业务场景非常的广泛,在分布式系统环境下,延时消息 ...

  8. uni-app消息推送方案

    一.引言 uni-app是支持消息推送的,参考如下文档: UniPush介绍 UniPush使用指南 UniPush开通指南 如何自定义推送通知的图标? 在 uni-app 中使用 UniPush 二 ...

  9. RocketMQ(九):rocketMQ设计的全链路消息零丢失方案?+rocketmq消息中间件事务消息机制的底层实现原理?+half是什么?+half消息是如何对消费者不可见的?

    前言: 目前rocketmq更新已经更新了11篇博客了,预计接下来的2-3篇是暂时的更新进度了,准备更新一下springboot或者是jvm,mysql相关的专题出来,后续更新完事后,再分享一些实战性 ...

最新文章

  1. 阿里当 PM 需要做什么?程序媛的亲身经历告诉你!| 程序员有话说
  2. JavaScript DOM操作表格及样式
  3. 科普:目标检测Anchor是什么?怎么科学设置?[附代码]
  4. 深圳内推 | 腾讯AI Lab自然语言处理中心招聘NLP研究型实习生
  5. c++ 不插入重复元素但也不排序_【每日一题】125. 对链表进行插入排序
  6. 多线程数据下载(akshare)
  7. java中的starts_Java Math类静态double nextAfter(double starts,double direction)示例
  8. 【数值分析】证明题一道
  9. IConfiguration的命令行解析
  10. 只腐蚀毛刺 腐蚀算法_去毛刺,这些方法更专业一点~
  11. java-集合(三)
  12. 脑子好,蹦两下!--程序员应该玩的小游戏
  13. pymongo访问数据前数据库名和集合名(表名)校验
  14. 蓝桥杯“基础练习: 十六进制转十进制
  15. Matplotlib--legend函数
  16. Java中的final、static、this、super 关键字
  17. 51单片机实现计算器程序
  18. 六级(2020/12-2) Section B
  19. MES系统在汽车零部件行业的应用
  20. Win7(32bit) + VS2012 + Qt

热门文章

  1. 论程序员如何规划职业路线?网友:从码农到工程师?
  2. python低代码_几行代码搞定ML模型,低代码机器学习Python库正式开源
  3. 计算机操作系统英文版课后答案,计算机操作系统(第3版)课后习题答案(完整版)...
  4. 手机1像素线粗_关于移动端一像素线的解决方案
  5. python利用tensorflow识别圆_RaspberryPi上实现佩戴口罩识别——2020电赛F题小记
  6. python怎么画参数函数图像_详解pandas.DataFrame.plot() 画图函数
  7. java 如何导出json文件_java导出json格式文件的示例代码
  8. matlab 等高线_MATLAB作图实例:39:更改等高线图的填充颜色
  9. python 字典添加元素乱序了_Python有序字典的两个小“惊喜”
  10. 多次执行sql 后卡住_解Bug之路记一次中间件导致的慢SQL排查过程