实战 | flink sql 实时 TopN
实战 | flink sql 实时 TopN
- 1.背景篇
- 2.难点剖析篇-此类指标建设、保障的难点
- 2.1.数据建设
- 2.2.数据保障
- 2.3.数据服务保障
- 3.数据建设篇-具体实现方案详述
- 3.1.整体数据服务架构
- 3.2.flink 方案设计
- 3.3.数据源
- 3.4 数据汇
- 3.5.数据建设
- 方案1、内层 rownum + 外层自定义 udf
- 方案2、自定义 udf
- 3.6.高可用、高性能
- 3.6.1.整体高可用保障
- 3.6.2.大流量、高性能
- 3.6.3.缩减状态大小
- 4.数据服务篇-数据服务选型
- 4.1.kv 存储
- 4.2.OLAP
- 5.数据保障篇-数据时效监控以及保障方案
- 6.效果篇-上述方案最终的效果
- 6.1.输出结果示例
- 6.2.应用产品示例
来源:微博
1.背景篇
根据微博目前站内词条消费情况,计算 top 50
消费热度词条,每分钟更新一次,并且按照列表展现给用户。
这类指标可以统一划分到 topN 类别的指标中。即输入是具体词条消费日志,输出是词条消费排行榜。
预期效果如下。
2.难点剖析篇-此类指标建设、保障的难点
2.1.数据建设
难点
榜单类的指标有一个特点,就是客户端获取到的数据必须是同一分钟当时的词条消费热度,这就要求我们产出的每一条数据需要包含 topN 中的所有数据。这样才能保障用户获取到的数据的一致性。
flink 任务大状态:词条多,状态大;词条具有时效性,所以对于低热词条需要进行删除
flink 任务大流量、高性能:数据源是全站的词条消费流量,得扛得住突发流量的暴揍
业界方案调研
Flink DataStream api 实时计算 topN 热榜
优点:可以按照用户自定义逻辑计算排名,基于 watermark 推动整个任务的计算,具备数据可回溯性。
缺点:开发成本高,而本期主要介绍 flink sql 的方案,这个方案可以供大家进行参考。
「结论:虽可实现,但并非 sql api 实现。」
Flink SQL api 实时计算 topN 热榜
优点:用户理解、开发成本低
缺点:只有排名发生变化的词条才会输出,排名未发生变化数据不会输出(后续会在「数据建设」模块进行解释),不能做到每一条数据包含目前
topN
的所有数据的需求。
「结论:不满足需求。」
结论
我们需要制定自己的 flink sql 解决方案,以实现上述需求。这也是本节重点要讲述的内容,即在「数据建设篇-具体实现方案详述」详细展开。
2.2.数据保障
难点
flink 任务高可用
榜单数据可回溯性
业界方案调研
flink 任务高可用:宕机之后快速恢复;有异地多活热备链路可随时切换
榜单数据可回溯性:任务失败之后,按照词条时间数据的进行回溯
2.3.数据服务保障
难点
数据服务引擎高可用
数据服务 server 高可用
业界方案调研
数据服务引擎高可用:数据服务引擎本身的高可用,异地双活实现
数据服务 server 高可用:异地双活实现;上游不更新数据,数据服务 server 模块也能查询出上一次的结果进行展示,至少不会什么数据都展示不了
3.数据建设篇-具体实现方案详述
3.1.整体数据服务架构
首先,我们最初的方案是如下图所示,单机房的服务端,但是很明显基本没有高可用保障。
我们本文主要介绍 flink sql 方案,所以下文先介绍 flink sql,后文 6.6 介绍各种高可用、高性能优化及保障。
3.2.flink 方案设计
从本节开始,正式介绍 flink sql 相关的方案设计。
我们会从以下三个角度去介绍:
数据源:了解数据源的 schema
数据汇:从数据应用角度出发设计数据汇的 schema
数据建设:从数据源、数据汇从而推导出我们要实现的 flink sql 方案
3.3.数据源
数据源即安装在各位的手机微博客户端上报的用户消费明细日志,即用户消费一次某个词条,就会上报一条对应的日志。
字段名 | 备注 |
---|---|
user_id | 消费词条的用户 |
热搜词条_name | 消费词条名称 |
timestamp | 消费词条时间戳 |
… | … |
3.4 数据汇
最开始设计的 schema 如下:
字段名 | 字段类型 | 备注 |
---|---|---|
timestamp | bigint | 当前分钟词条时间戳 |
热搜词条_name | string | 词条名 |
rn | bigint | 排名 1 - 50 |
但是排名展示时,需要将这一分钟的前 50 名的数据全部查询到展示。而 flink 任务输出排名数据到外部存储时,保障前 50 名的词条数据事务性的输出(要么同时输出到数据服务中,要么一条也不输出)是一件比较复杂事情。
所以我们索性将前 50 名的数据全部收集到同一条数据当中,时间戳最新的一条数据就是最新的结果数据。
重新设计的 schema 如下:
字段名 | 字段类型 | 备注 |
---|---|---|
timestamp | bigint | 当前分钟词条时间戳 |
热搜榜单 | string | 热搜榜单,schema 如 {“排名第一的词条1” : “排名第一的词条消费量”, “排名第二的词条1” : “排名第二的词条消费量”, “排名第三的词条1” : “排名第三的词条消费量”…} 前 50 名 |
3.5.数据建设
方案1、内层 rownum + 外层自定义 udf
从排名的角度出发,自然可以想到 「
rownum
」 进行排名(阿里云也有对应的实现案例)最终要把排行榜合并到一条数据进行输出,那就必然会涉及到「
自定义 udf
」 将排名数据进行合并
sql
INSERT INTOtarget_db.target_table
SELECTmax(timestamp) AS timestamp,热搜_top50_json(热搜词条_name, cnt) AS data -- 外层 udaf 将所有数据进行 merge
FROM(SELECT热搜词条_name,cnt,timestamp,row_number() over(PARTITION by 热搜词条_name ORDER BY cnt ASC) AS rn -- 内层 rownum 进行排名FROM(SELECT热搜词条_name,count(1) AS cnt,max(timestamp) AS timestampFROMsource_db.source_tableGROUP BY热搜词条_name-- 如果有热点词条导致数据倾斜,可以加一层打散层))
WHERErn <= 100
GROUP BY0;
udf
udaf 开发参考:https://www.alibabacloud.com/help/zh/doc-detail/69553.htm?spm=a2c63.o282931.b99.244.4ad11889wWZiHL
top50_udaf:作用是将已经经过上游处理的消费量排前 100 名词条拿到进行排序后,合并成一个 top50 排行榜 json 字符串产出。
Accumulator:由需求可以知道,当前 udaf 是为了计算前 50 名的消费词条,所以 Accumulator 应该存储截止当前时间按照消费 cnt 数排名的前 100 名的词条。我们由此就可以想到使用 「最小堆」 来当做 Accumulator,Accumulator 中只存储消费 cnt 前 100 的数据。
最小堆的实现
topN 设计伪代码如下:
public class 热搜_top50_json extends AggregateFunction<Map<String, Long>, TopN<Pair<String, Long>>> {@Overridepublic TopN<Pair<String, Long>> createAccumulator() {// 创建 acc -> 最小堆实现的 Top 50}@Overridepublic String getValue(TopN<Pair<String, Long>> acc) {// 1.将最小堆 acc 中列表数据拿到// 2.然后将列表按照从大到小进行排序// 3.产出结果数据}public void accumulate(TopN<Pair<String, Long>> acc, String 词条名称, long cnt) {// 1.获取到当前最小堆中的最小值// 如果当前词条的消费量 cnt 小于最小堆的堆顶// 则直接进行过滤// 2.如果最小堆中不存在当前词条// 则直接将当前词条放入最小堆中// 3.如果最小堆中已经存在当前词条存在// 那么将最小堆中这个词条的消费 cnt 与// 当前词条的 cnt 作比较,将大的那个放入最小堆中}public void retract(TopN<Pair<String, Long>> acc, String id, long cnt) {// 不需要实现 retract 方法// 由于 topn 具有特殊性:即我们只取每一个词条的最大值// 进行排名,所以可以不需要实现 retract 方法// 比较排名都在 accumulate 方法中已经实现完成}
}
小结
上述 udf 最好设计成一个固定大小排行榜的 udf,比如一个 udf 实现类就只能用于处理一个固定大小的排行;
sql 内层计算的排行榜大小一定要比 sql 外层(聚合)排行榜大小大
。举反例:假如内层计算前 30 名,外层计算前 50 名,内层 A 分桶第 31 名可能比 B 分桶第 1 名的值还大,但是 A 桶的第 31 名就不会被输出。反之则正确。
flink-conf.yaml 参数配置
由于上述 sql 是在无限流上的操作,所以上游数据每更新一次都会向下游发送一次 retract 消息以及最新的数据的消息进行计算。
那么就会存在这样一个问题,即 source qps 为 x 时,任务内的吞吐就为 x * n
倍,sink qps 也为 x,这会导致性能大幅下降的同时也会导致输出结果数据量非常大。
而我们只需要每分钟更新一次结果即可,所以可以使用 flink sql 自带的 minibatch 参数来控制输出结果的频次。
table.exec.mini-batch.enabled : true
# minibatch 是下面两个任意一个符合条件就会起触发计算
# 60s 一次
table.exec.mini-batch.allow-latency : 60 s
# 数量达到 10000000000 触发一次
# 设置为 10000000000 是为了让上面的 allow-latency 触发,每 60s 输出一次来满足我们的需求
table.exec.mini-batch.size : 10000000000
状态过期,如果不设置的话,词条状态会越来越大,对非高热词条进行清除。
# 设置 1 天的 ttl,如果一天过后
# 这个词条还没有更新,则直接删除
table.exec.state.ttl : 86400 s
方案2、自定义 udf
sql
INSERT INTO target_db.target_table
SELECTmax(timestamp) AS timestamp,-- udf 计算每一个分桶的前 100 名列表热搜_top50_json(cast(热搜词条_name AS string), cnt) AS bucket_top100
FROM(SELECT热搜词条_name AS 热搜词条_name,count(1) AS cnt,max(timestamp) AS timestampFROMsource_db.source_tableGROUP BY热搜词条_name-- 如果有热点词条导致数据倾斜-- 可以加一层打散层)
GROUP BY0
-- 由于这里是 group by 0
-- 所以可能会到导致热点,所以如果需要也可以加一层打散层
-- 在内部先算 top50,在外层将内部分桶的 top50 榜单进行 merge
3.6.高可用、高性能
3.6.1.整体高可用保障
异地双链路热备如下图:
可能会发现图中有异地机房,但是我们目前只画出了 A 地区机房的数据链路,B 地区机房还没有画全,接着我们一步一步将这个图进行补全。
异地双机房只是双链路的热备的一种案例。如果有同城双机房、双集群也可进行同样的服务部署。
为什么说异地机房的保障能力 > 同城异地机房 > 同城同机房双集群容灾能力?
同城同机房:只要这个机房挂了,即使你有两套链路也没救。
同城异地机房:很小几率情况会同城异地两个机房都挂了。。
异地机房:几乎不可能同时异地两个机房都被炸了。。。
数据源日志高可用
数据源日志 server 服务高可用:异地机房,当一个机房挂了之后,在客户端可以自动将日志发送到另一个机房的 webserver
数据源日志 kafka 服务高可用:kafka 使用异地机房 topic,其实就是两个 topic,每个机房一个 topic,两个 topic 互为热备,producer 在向下游两个机房的 topic 写数据时,可以将 50% 的流量写入一个机房,另外 50% 的流量写入另一个机房,一旦一个机房的 kafka 集群宕机,则 producer 端可以自动将 100% 的流量切换到另一个机房的 kafka。
正常情况下如图所示:
当发生 A 地机房 webserver 宕机时,客户端自动切换上报日志至 B 地机房 webserver。如下图所示:
kafka 也相同。如下图所示:
flink 任务高可用
flink 任务以 A 地机房做主链路,B 地机房启动相同的任务做热备双跑链路。
当 A 地机房 flink 任务宕机且无法恢复时,则 B 地机房的任务做热备替换。
正常情况下如图所示:
当 A 地机房 flink 任务宕机且无法恢复时,热备链路 flink 任务就可以顶上。如下图所示:
数据服务高可用
正常情况如下:
当 A 地 OLAP 或者 KV 存储挂了之后,webserver 可以自动切换至 B 地 OLAP 或者 KV 存储。如下图所示:
当 A 地 webserver 挂了之后,客户端可以自动拉取 B 地 webserver 数据,如下图所示:
3.6.2.大流量、高性能
数据源
数据源、汇反序列化性能提升:静态反序列化性能 > 动态反序列化性能。
举例 ProtoBuf。可以在 source 端先进行代码生成,然后用生成好的代码去反序列化源消息的性能会远好于使用 ProtoBuf Dynamic Message。
3.6.3.缩减状态大小
将状态中的 string 长度做映射之后变小
如果要计算 uv,可以将 string 类的 id 转换为 long 类型
rocksdb 增量 checkpoint,减小任务做 checkpoint 的压力
4.数据服务篇-数据服务选型
4.1.kv 存储
根据我们上述设计的数据汇 schema 来看,最适合存储引擎就是 kv 引擎,因为前端只需要展示最新的排行榜数据即可。所以我们可以使用 redis 等 kv 存储引擎来存储最新的数据。
4.2.OLAP
如果用户有需求需要记录上述数据的历史记录,我们也可以使用时序数据库或者 OLAP 引擎直接进行存储。
5.数据保障篇-数据时效监控以及保障方案
数据时效监控以及保障方案
6.效果篇-上述方案最终的效果
6.1.输出结果示例
{"黄子韬 杨紫是我哥们": 1672825,"延乔墓前的来信破防了": 1087416,"孟子义 张翰同学站起来": 747703// ...
}
6.2.应用产品示例
实战 | flink sql 实时 TopN相关推荐
- Flink SQL 实时大屏(实时查询存量数据-批转流)
最近接到一个需求,关于flink实时大屏需求.每半小时展示历史每天当前半小时(每天00:00:00-00:30:00之间) 的数据的最大值.最小值.中位数.上四分位数.下四分位数. 需求描述 每半小时 ...
- Flink SQL 实时计算UV指标
用一个接地气的案例来介绍如何实时计算 UV 数据.大家都知道,在 ToC 的互联网公司,UV 是一个很重要的指标,对于老板.商务.运营的及时决策会产生很大的影响,笔者在电商公司,目前主要的工作就是计算 ...
- Flink SQL高效Top-N方案的实现原理
Top-N是我们应用Flink进行业务开发时的常见场景,传统的DataStream API已经有了非常成熟的实现方案,如果换成Flink SQL,又该怎样操作?好在Flink SQL官方文档已经给出了 ...
- Flink SQL 功能解密系列 —— 流式 TopN 挑战与实现
TopN 是统计报表和大屏非常常见的功能,主要用来实时计算排行榜.流式的 TopN 不同于批处理的 TopN,它的特点是持续的在内存中按照某个统计指标(如出现次数)计算 TopN 排行榜,然后当排行榜 ...
- flink sql实战案例
目录 一.背景 二.流程 三.案例 1.flink sql读取 Kafka 并写入 MySQL source sink insert 2.flinksql读kafka写入kudu source sin ...
- flink sql 知其所以然(二)| 自定义 redis 数据维表(附源码)
感谢您的关注 + 点赞 + 再看,对博主的肯定,会督促博主持续的输出更多的优质实战内容!!! 1.序篇-本文结构 背景篇-为啥需要 redis 维表 目标篇-做 redis 维表的预期效果是什么 ...
- 大数据分析实战之项目实践:使用DLI Flink SQL进行电商实时业务数据分析
使用 DLI Flink SQL 进行电商实时业务数据分析 业务场景介绍 场景描述 场景方案 场景任务 数据说明 数据源表:电商业务订单详情宽表 结果表:各渠道的销售总额实时统计表 操作过程 实操过程 ...
- Flink最锋利的武器:Flink SQL入门和实战 | 附完整实现代码
作者 | 机智的王知无 转载自大数据技术与架构(ID: import_bigdata) 一.Flink SQL 背景 Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门 ...
- flink sql udf jar包_flink教程flink 1.11 集成zeppelin实现简易实时计算平台
背景 zeppelin不提供per job模式 实时平台开发周期长 基于zeppelin开发一个简易实时平台 开发zeppelin Interpreter 提交sql任务 提交jar任务 背景 随着f ...
最新文章
- Robotium测试没有源码的apk--需重签名apk
- ubuntu 如何右上角显示键盘
- vim编辑二进制文件
- Java int -1无符号右移_java中的无符号右移
- 【Java】我的第一个 JAVA 程序:Hello,world!
- 7a系列mrcc xilinx_artix-7A200T的输入时钟(50M)管脚接到MRCC的N端了,怎么解决?
- php ajax 增删改查 分页,Jquery之Ajax_分页及增删改查
- 医院药品管理系统java sql_医院药品管理系统设计(Netbeans,Myeclipse,MySQL,SQLServer)
- 自动驾驶毫米波雷达物体检测技术-算法
- 深度学习 | 《深度学习》“花书”知识点笔记
- 各地的公安接口的配置说明书
- Zemax操作--2(单透镜和双胶合透镜优化)
- 增长研究:电子烟巨头JUUL未公开的增长启示
- 发动机冒黑烟_汽车发动机冒黑烟的原因与处理方法
- 五万美元的年薪是如何花光的
- 3D数据基础——向量介绍与3D向量类的实现
- C++的学习路线以及未来就业趋势
- 打印出ntdll.dll中所有函数名字和地址
- 创建vue项目的时候报错:Skipped git commit due to missing username and email in git config.
- python爬虫(6)——Selenium的使用