实战 | flink sql 实时 TopN

  • 1.背景篇
  • 2.难点剖析篇-此类指标建设、保障的难点
    • 2.1.数据建设
    • 2.2.数据保障
    • 2.3.数据服务保障
  • 3.数据建设篇-具体实现方案详述
    • 3.1.整体数据服务架构
    • 3.2.flink 方案设计
    • 3.3.数据源
    • 3.4 数据汇
    • 3.5.数据建设
      • 方案1、内层 rownum + 外层自定义 udf
      • 方案2、自定义 udf
    • 3.6.高可用、高性能
      • 3.6.1.整体高可用保障
      • 3.6.2.大流量、高性能
      • 3.6.3.缩减状态大小
  • 4.数据服务篇-数据服务选型
    • 4.1.kv 存储
    • 4.2.OLAP
  • 5.数据保障篇-数据时效监控以及保障方案
  • 6.效果篇-上述方案最终的效果
    • 6.1.输出结果示例
    • 6.2.应用产品示例

来源:微博

1.背景篇

根据微博目前站内词条消费情况,计算 top 50 消费热度词条,每分钟更新一次,并且按照列表展现给用户。

这类指标可以统一划分到 topN 类别的指标中。即输入是具体词条消费日志,输出是词条消费排行榜。

预期效果如下。

2.难点剖析篇-此类指标建设、保障的难点

2.1.数据建设

难点

  1. 榜单类的指标有一个特点,就是客户端获取到的数据必须是同一分钟当时的词条消费热度,这就要求我们产出的每一条数据需要包含 topN 中的所有数据。这样才能保障用户获取到的数据的一致性。

  2. flink 任务大状态:词条多,状态大;词条具有时效性,所以对于低热词条需要进行删除

  3. flink 任务大流量、高性能:数据源是全站的词条消费流量,得扛得住突发流量的暴揍

业界方案调研

Flink DataStream api 实时计算 topN 热榜

  • 优点:可以按照用户自定义逻辑计算排名,基于 watermark 推动整个任务的计算,具备数据可回溯性。

  • 缺点:开发成本高,而本期主要介绍 flink sql 的方案,这个方案可以供大家进行参考。

「结论:虽可实现,但并非 sql api 实现。」

Flink SQL api 实时计算 topN 热榜

  • 优点:用户理解、开发成本低

  • 缺点:只有排名发生变化的词条才会输出,排名未发生变化数据不会输出(后续会在「数据建设」模块进行解释),不能做到每一条数据包含目前 topN 的所有数据的需求。

「结论:不满足需求。」

结论

我们需要制定自己的 flink sql 解决方案,以实现上述需求。这也是本节重点要讲述的内容,即在「数据建设篇-具体实现方案详述」详细展开。

2.2.数据保障

难点

  • flink 任务高可用

  • 榜单数据可回溯性

业界方案调研

  • flink 任务高可用:宕机之后快速恢复;有异地多活热备链路可随时切换

  • 榜单数据可回溯性:任务失败之后,按照词条时间数据的进行回溯

2.3.数据服务保障

难点

  • 数据服务引擎高可用

  • 数据服务 server 高可用

业界方案调研

  • 数据服务引擎高可用:数据服务引擎本身的高可用,异地双活实现

  • 数据服务 server 高可用:异地双活实现;上游不更新数据,数据服务 server 模块也能查询出上一次的结果进行展示,至少不会什么数据都展示不了

3.数据建设篇-具体实现方案详述

3.1.整体数据服务架构

首先,我们最初的方案是如下图所示,单机房的服务端,但是很明显基本没有高可用保障。

我们本文主要介绍 flink sql 方案,所以下文先介绍 flink sql,后文 6.6 介绍各种高可用、高性能优化及保障。

3.2.flink 方案设计

从本节开始,正式介绍 flink sql 相关的方案设计。

我们会从以下三个角度去介绍:

  1. 数据源:了解数据源的 schema

  2. 数据汇:从数据应用角度出发设计数据汇的 schema

  3. 数据建设:从数据源、数据汇从而推导出我们要实现的 flink sql 方案

3.3.数据源

数据源即安装在各位的手机微博客户端上报的用户消费明细日志,即用户消费一次某个词条,就会上报一条对应的日志。

字段名 备注
user_id 消费词条的用户
热搜词条_name 消费词条名称
timestamp 消费词条时间戳

3.4 数据汇

最开始设计的 schema 如下:

字段名 字段类型 备注
timestamp bigint 当前分钟词条时间戳
热搜词条_name string 词条名
rn bigint 排名 1 - 50

但是排名展示时,需要将这一分钟的前 50 名的数据全部查询到展示。而 flink 任务输出排名数据到外部存储时,保障前 50 名的词条数据事务性的输出(要么同时输出到数据服务中,要么一条也不输出)是一件比较复杂事情。

所以我们索性将前 50 名的数据全部收集到同一条数据当中,时间戳最新的一条数据就是最新的结果数据。

重新设计的 schema 如下:

字段名 字段类型 备注
timestamp bigint 当前分钟词条时间戳
热搜榜单 string 热搜榜单,schema 如 {“排名第一的词条1” : “排名第一的词条消费量”, “排名第二的词条1” : “排名第二的词条消费量”, “排名第三的词条1” : “排名第三的词条消费量”…} 前 50 名

3.5.数据建设

方案1、内层 rownum + 外层自定义 udf

  1. 从排名的角度出发,自然可以想到 「rownum」 进行排名(阿里云也有对应的实现案例)

  2. 最终要把排行榜合并到一条数据进行输出,那就必然会涉及到「自定义 udf」 将排名数据进行合并

sql

INSERT INTOtarget_db.target_table
SELECTmax(timestamp) AS timestamp,热搜_top50_json(热搜词条_name, cnt) AS data -- 外层 udaf 将所有数据进行 merge
FROM(SELECT热搜词条_name,cnt,timestamp,row_number() over(PARTITION by 热搜词条_name ORDER BY cnt ASC) AS rn -- 内层 rownum 进行排名FROM(SELECT热搜词条_name,count(1) AS cnt,max(timestamp) AS timestampFROMsource_db.source_tableGROUP BY热搜词条_name-- 如果有热点词条导致数据倾斜,可以加一层打散层))
WHERErn <= 100
GROUP BY0;

udf

  • udaf 开发参考:https://www.alibabacloud.com/help/zh/doc-detail/69553.htm?spm=a2c63.o282931.b99.244.4ad11889wWZiHL

  • top50_udaf:作用是将已经经过上游处理的消费量排前 100 名词条拿到进行排序后,合并成一个 top50 排行榜 json 字符串产出。

  • Accumulator:由需求可以知道,当前 udaf 是为了计算前 50 名的消费词条,所以 Accumulator 应该存储截止当前时间按照消费 cnt 数排名的前 100 名的词条。我们由此就可以想到使用 「最小堆」 来当做 Accumulator,Accumulator 中只存储消费 cnt 前 100 的数据。

  • 最小堆的实现

topN 设计伪代码如下:

public class 热搜_top50_json extends AggregateFunction<Map<String, Long>, TopN<Pair<String, Long>>> {@Overridepublic TopN<Pair<String, Long>> createAccumulator() {// 创建 acc -> 最小堆实现的 Top 50}@Overridepublic String getValue(TopN<Pair<String, Long>> acc) {// 1.将最小堆 acc 中列表数据拿到// 2.然后将列表按照从大到小进行排序// 3.产出结果数据}public void accumulate(TopN<Pair<String, Long>> acc, String 词条名称, long cnt) {// 1.获取到当前最小堆中的最小值// 如果当前词条的消费量 cnt 小于最小堆的堆顶// 则直接进行过滤// 2.如果最小堆中不存在当前词条// 则直接将当前词条放入最小堆中// 3.如果最小堆中已经存在当前词条存在// 那么将最小堆中这个词条的消费 cnt 与// 当前词条的 cnt 作比较,将大的那个放入最小堆中}public void retract(TopN<Pair<String, Long>> acc, String id, long cnt) {// 不需要实现 retract 方法// 由于 topn 具有特殊性:即我们只取每一个词条的最大值// 进行排名,所以可以不需要实现 retract 方法// 比较排名都在 accumulate 方法中已经实现完成}
}

小结

上述 udf 最好设计成一个固定大小排行榜的 udf,比如一个 udf 实现类就只能用于处理一个固定大小的排行;

sql 内层计算的排行榜大小一定要比 sql 外层(聚合)排行榜大小。举反例:假如内层计算前 30 名,外层计算前 50 名,内层 A 分桶第 31 名可能比 B 分桶第 1 名的值还大,但是 A 桶的第 31 名就不会被输出。反之则正确。

flink-conf.yaml 参数配置

由于上述 sql 是在无限流上的操作,所以上游数据每更新一次都会向下游发送一次 retract 消息以及最新的数据的消息进行计算。

那么就会存在这样一个问题,即 source qps 为 x 时,任务内的吞吐就为 x * n 倍,sink qps 也为 x,这会导致性能大幅下降的同时也会导致输出结果数据量非常大。

而我们只需要每分钟更新一次结果即可,所以可以使用 flink sql 自带的 minibatch 参数来控制输出结果的频次。

table.exec.mini-batch.enabled : true
# minibatch 是下面两个任意一个符合条件就会起触发计算
# 60s 一次
table.exec.mini-batch.allow-latency : 60 s
# 数量达到 10000000000 触发一次
# 设置为 10000000000 是为了让上面的 allow-latency 触发,每 60s 输出一次来满足我们的需求
table.exec.mini-batch.size : 10000000000

状态过期,如果不设置的话,词条状态会越来越大,对非高热词条进行清除。

# 设置 1 天的 ttl,如果一天过后
# 这个词条还没有更新,则直接删除
table.exec.state.ttl : 86400 s

方案2、自定义 udf

sql

INSERT INTO target_db.target_table
SELECTmax(timestamp) AS timestamp,-- udf 计算每一个分桶的前 100 名列表热搜_top50_json(cast(热搜词条_name AS string), cnt) AS bucket_top100
FROM(SELECT热搜词条_name AS 热搜词条_name,count(1) AS cnt,max(timestamp) AS timestampFROMsource_db.source_tableGROUP BY热搜词条_name-- 如果有热点词条导致数据倾斜-- 可以加一层打散层)
GROUP BY0
-- 由于这里是 group by 0
-- 所以可能会到导致热点,所以如果需要也可以加一层打散层
-- 在内部先算 top50,在外层将内部分桶的 top50 榜单进行 merge

3.6.高可用、高性能

3.6.1.整体高可用保障

异地双链路热备如下图:

可能会发现图中有异地机房,但是我们目前只画出了 A 地区机房的数据链路,B 地区机房还没有画全,接着我们一步一步将这个图进行补全。

异地双机房只是双链路的热备的一种案例。如果有同城双机房、双集群也可进行同样的服务部署。

为什么说异地机房的保障能力 > 同城异地机房 > 同城同机房双集群容灾能力?

同城同机房:只要这个机房挂了,即使你有两套链路也没救。

同城异地机房:很小几率情况会同城异地两个机房都挂了。。

异地机房:几乎不可能同时异地两个机房都被炸了。。。

数据源日志高可用

数据源日志 server 服务高可用:异地机房,当一个机房挂了之后,在客户端可以自动将日志发送到另一个机房的 webserver

数据源日志 kafka 服务高可用:kafka 使用异地机房 topic,其实就是两个 topic,每个机房一个 topic,两个 topic 互为热备,producer 在向下游两个机房的 topic 写数据时,可以将 50% 的流量写入一个机房,另外 50% 的流量写入另一个机房,一旦一个机房的 kafka 集群宕机,则 producer 端可以自动将 100% 的流量切换到另一个机房的 kafka。

正常情况下如图所示:

当发生 A 地机房 webserver 宕机时,客户端自动切换上报日志至 B 地机房 webserver。如下图所示:

kafka 也相同。如下图所示:


flink 任务高可用

flink 任务以 A 地机房做主链路,B 地机房启动相同的任务做热备双跑链路。

当 A 地机房 flink 任务宕机且无法恢复时,则 B 地机房的任务做热备替换。

正常情况下如图所示:


当 A 地机房 flink 任务宕机且无法恢复时,热备链路 flink 任务就可以顶上。如下图所示:


数据服务高可用

正常情况如下:


当 A 地 OLAP 或者 KV 存储挂了之后,webserver 可以自动切换至 B 地 OLAP 或者 KV 存储。如下图所示:


当 A 地 webserver 挂了之后,客户端可以自动拉取 B 地 webserver 数据,如下图所示:

3.6.2.大流量、高性能

数据源

数据源、汇反序列化性能提升:静态反序列化性能 > 动态反序列化性能。

举例 ProtoBuf。可以在 source 端先进行代码生成,然后用生成好的代码去反序列化源消息的性能会远好于使用 ProtoBuf Dynamic Message。

3.6.3.缩减状态大小

将状态中的 string 长度做映射之后变小

如果要计算 uv,可以将 string 类的 id 转换为 long 类型

rocksdb 增量 checkpoint,减小任务做 checkpoint 的压力

4.数据服务篇-数据服务选型

4.1.kv 存储

根据我们上述设计的数据汇 schema 来看,最适合存储引擎就是 kv 引擎,因为前端只需要展示最新的排行榜数据即可。所以我们可以使用 redis 等 kv 存储引擎来存储最新的数据。

4.2.OLAP

如果用户有需求需要记录上述数据的历史记录,我们也可以使用时序数据库或者 OLAP 引擎直接进行存储。

5.数据保障篇-数据时效监控以及保障方案

数据时效监控以及保障方案

6.效果篇-上述方案最终的效果

6.1.输出结果示例

{"黄子韬  杨紫是我哥们": 1672825,"延乔墓前的来信破防了": 1087416,"孟子义 张翰同学站起来": 747703// ...
}

6.2.应用产品示例

实战 | flink sql 实时 TopN相关推荐

  1. Flink SQL 实时大屏(实时查询存量数据-批转流)

    最近接到一个需求,关于flink实时大屏需求.每半小时展示历史每天当前半小时(每天00:00:00-00:30:00之间) 的数据的最大值.最小值.中位数.上四分位数.下四分位数. 需求描述 每半小时 ...

  2. Flink SQL 实时计算UV指标

    用一个接地气的案例来介绍如何实时计算 UV 数据.大家都知道,在 ToC 的互联网公司,UV 是一个很重要的指标,对于老板.商务.运营的及时决策会产生很大的影响,笔者在电商公司,目前主要的工作就是计算 ...

  3. Flink SQL高效Top-N方案的实现原理

    Top-N是我们应用Flink进行业务开发时的常见场景,传统的DataStream API已经有了非常成熟的实现方案,如果换成Flink SQL,又该怎样操作?好在Flink SQL官方文档已经给出了 ...

  4. Flink SQL 功能解密系列 —— 流式 TopN 挑战与实现

    TopN 是统计报表和大屏非常常见的功能,主要用来实时计算排行榜.流式的 TopN 不同于批处理的 TopN,它的特点是持续的在内存中按照某个统计指标(如出现次数)计算 TopN 排行榜,然后当排行榜 ...

  5. flink sql实战案例

    目录 一.背景 二.流程 三.案例 1.flink sql读取 Kafka 并写入 MySQL source sink insert 2.flinksql读kafka写入kudu source sin ...

  6. flink sql 知其所以然(二)| 自定义 redis 数据维表(附源码)

    感谢您的关注  +  点赞 + 再看,对博主的肯定,会督促博主持续的输出更多的优质实战内容!!! 1.序篇-本文结构 背景篇-为啥需要 redis 维表 目标篇-做 redis 维表的预期效果是什么 ...

  7. 大数据分析实战之项目实践:使用DLI Flink SQL进行电商实时业务数据分析

    使用 DLI Flink SQL 进行电商实时业务数据分析 业务场景介绍 场景描述 场景方案 场景任务 数据说明 数据源表:电商业务订单详情宽表 结果表:各渠道的销售总额实时统计表 操作过程 实操过程 ...

  8. Flink最锋利的武器:Flink SQL入门和实战 | 附完整实现代码

    作者 | 机智的王知无 转载自大数据技术与架构(ID: import_bigdata) 一.Flink SQL 背景 Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门 ...

  9. flink sql udf jar包_flink教程flink 1.11 集成zeppelin实现简易实时计算平台

    背景 zeppelin不提供per job模式 实时平台开发周期长 基于zeppelin开发一个简易实时平台 开发zeppelin Interpreter 提交sql任务 提交jar任务 背景 随着f ...

最新文章

  1. Robotium测试没有源码的apk--需重签名apk
  2. ubuntu 如何右上角显示键盘
  3. vim编辑二进制文件
  4. Java int -1无符号右移_java中的无符号右移
  5. 【Java】我的第一个 JAVA 程序:Hello,world!
  6. 7a系列mrcc xilinx_artix-7A200T的输入时钟(50M)管脚接到MRCC的N端了,怎么解决?
  7. php ajax 增删改查 分页,Jquery之Ajax_分页及增删改查
  8. 医院药品管理系统java sql_医院药品管理系统设计(Netbeans,Myeclipse,MySQL,SQLServer)
  9. 自动驾驶毫米波雷达物体检测技术-算法
  10. 深度学习 | 《深度学习》“花书”知识点笔记
  11. 各地的公安接口的配置说明书
  12. Zemax操作--2(单透镜和双胶合透镜优化)
  13. 增长研究:电子烟巨头JUUL未公开的增长启示
  14. 发动机冒黑烟_汽车发动机冒黑烟的原因与处理方法
  15. 五万美元的年薪是如何花光的
  16. 3D数据基础——向量介绍与3D向量类的实现
  17. C++的学习路线以及未来就业趋势
  18. 打印出ntdll.dll中所有函数名字和地址
  19. 创建vue项目的时候报错:Skipped git commit due to missing username and email in git config.
  20. python爬虫(6)——Selenium的使用

热门文章

  1. 英文中 vi和vt的区别
  2. 【机器人小游戏---html(附源代码)】
  3. docker命令,一些常用的docker命令
  4. 贪心算法——埃及分数问题
  5. 零基础也能看懂的五大网络安全技术,学网络安全真的可以很简单
  6. IP、网关、端口、网段、子网掩码概念区别
  7. 《深度学习入门——基于Python的理论与实现》笔记
  8. 以太坊中的账户、交易、Gas和区块Gas Limit等基本概念
  9. Nodejs正则表达式
  10. 微信小程序中使用Echarts(折线图)