| 导语: 介绍下最近使用 Flink 来对计费数据进行去重的具体做法

一. 背景

AI 视觉产品在我们腾讯云-人工智能的产品目录下,包括人脸识别、人脸特效、人脸核身、图像识别、文字识别等。
流计算 Oceanus 在腾讯云-大数据的产品目录下,是基于 Apache Flink 构建的企业级实时大数据分析平台。
AI 视觉产品是按调用量计费,毕竟涉及到钱,用户对计量数据准确是非常敏感的; 另外调用量本身也比较大,如何保证数据的准确一致也是一个比较大的挑战。
数据不准: 主要包括数据丢失和数据重复(当然可能有其他问题比如上报的数据本身错误等,暂不属于本次讨论范围)。
数据丢失: 相当于调用量少算,会影响我们的收入。一方面我们通常重试、持久化等方式尽量减少数据的丢失,目标当然是完全不丢,但很难做到100%不丢。另一方面很少量的数据丢失对于实际收入影响很小,对用户基本没有影响。
数据重复: 相当于调用量多算就会多收用户钱,用户一旦发现肯定会投诉过来。所以是必须要去解决的,但是数据量很大,要做到精确去重比较难。

整体的背景和处理逻辑可以参考如下业务流程图, 本次主要介绍下我们在数据去重方面的一些尝试。

系统架构图:

二. 思路与调研

去重的触发时机: 数据重复的原因主要是各种重试:包括上游传输环节的超时重试和下游计算环节的系统重启导致的数据重算。因为我们通常使用的是最终的数据,只要保证最终数据不重复即可,所以只要在最后的计算环节进行一次去重就可以,前面的环节不用处理。

去重的技术手段: 保证数据处理中不重复、不丢失(数据一致),通常有 2 个技术手段:事务和幂等可重入(幂等重入可能出现部分数据插入了的时间段,没有事务还能保证过程中的精确,但如上所述我们只要最终数据一致,所以幂等也是可以的)。

事务的实现难度高,尤其在分布式或多个组件要用到 2PC 之类的事务,更加复杂;所以通常事务都是组件本身成熟的实现,很少从头开发的; 而幂等通常是使用数据的唯一键来保证去重,但是在我们数据累计这里不适用,因为聚合时的数据的顺序和数量在每次计算时不是固定的,所以如果出现重启要重新计算时并不能生成和上次一样的唯一键,就难以使用键去重。

经过调研发现 Flink 本身是支持 2PC 事务和内部的状态存储,可以做到 exactly-once,当然使用起来会有成本(包括学习成本、问题排查等,Flink 的开发入门和资料可以参见 Flink 入门 1-零基础用户实现简单 Flink 任务[2] 和 Flink 入门9-Jar 作业开发[3])。
考虑到后续我们数据量增加后的数据处理能力以及其他一些流处理的场景都还是会用到 Flink,所以与其自己 DIY 不如使用成熟开源的组件,也符合当前开源协同的趋势,所以决定直接使用 Flink 里面的去重,下面是 Flink 实现 2PC 的流程:

2PC 事务有 2 个角色:协调者(发起者、控制者)和参与者(要支持本地事务),如上图所示。
Flink 的 JobManager 是协调者;Flink 内部的状态、流程属于内部参与者;Kafka 作为 Source 和 Sink 是外部参与者,尤其是作为 Sink 的 Kafka 要选择支持事务的版本(>=0.11)。

Flink 介绍事务的可以参考 官方文档[4]。

Spark vs Flink 在 excatly-once 上的对比讨论,也说到很多基本概念的理解:https://zhuanlan.zhihu.com/p/77677075

三. 实现

基本的流程和上图一致,基本代码如下:

简单介绍下每个步骤,同时讨论下可能的问题:

0. 前提:

Flink 打开 Checkpoint,相当于 Flink 打开了 redo、undo log 等持久化的机制,是事务的基础。

1. 事务开始:

从上一个处理完成 offset 消费 Kafka 数据,当然这里 Kafka 里面的数据格式需要自行去解析,可以做些简单的处理。

2. 事务处理:

2.1 按照用户 KeyBy 分流,提高并发

按照用户分流可以保证同一个用户在同一个处理流中,从而保证数据去重(不同用户的数据认为不会重复的)。当然这里也有一个数据倾斜的问题:如果某个用户调用量特别大就会导致部分流负载很高,拖累这个处理的速度,目前我们的数据分布根据测试还好。通过学习了解到如果数据倾斜严重可以再次选择更好的 Key 分流:比如可以按照用户 RequestId 的前缀进行分流更均匀,另外 Flink 也提供了 rebalance 的接口强制将数据打散,当然要符合逻辑数据分布要求。

2.2 声明 map 状态存储处理过的数据,用于去重

测试时可以选 memory,但是线上还是要使用 Rocksdb 应对数据量大的场景,同时要开启 TTL 机制避免状态太大对内存和 Checkpoint 保存和恢复时产生太大的 IO 压力,开始时建议先选择比较短的 TTL,观察内存和负载再逐步调大,目前我们 TTL 可以到 15 分,希望可以逐步调大到 1 小时 - 10 小时。
当然当这里如果数据量特别大时,用到的存储也就很,很容易磁盘、内存产生大的压力,所以这里要进行实际的测试和调整:比如增多机器提高并发,或者使用 Rocksdb 增量式的 Checkpoint 等。
这里存储数据的时间长短决定了去重的数据的范围,如果太大如上所述对存储压力很大,造成 Flink 运行不稳定;但如果太小只能小局部去重,对于跨度比较大的数据重复不能应对,比如跨天的数据也可能重复,在离线上报的链路中就可能跨天重试的,通常在实时上报的链路不会出现,对于这种长时间还有重复的,目前想到有 2 个处理的方向(还没具体落地):

  1. 使用 Redis 存储处理过的数据(不要求很及时),上报时先去这里去重;问题首先是对存储压力增大不少,同时要增加一次查重的耗时
  2. 要求上报方记录下上报的结果不要重复上报,即使重复上报时间间隔也不能太长这里虽然对业务不友好,但是可以理解,毕竟极端情况下也有现在 30 号要从新上报 1 号的数据也可能出现,那如果用方案 1,就要 1 个月的数据完全存储下来成本太大。

2.2.1 去重 Key 的选择

通常来说直接选 RequestId 就可以,当然保险起见,加上用户维度也是可以的(可以应对下 RequestId 少量重复的情况)。但测试中发现几个问题:

  1. 用户一次的请求,到后台业务对应多次处理都上报给计费了,组合结果后返回给用户——这样就会导致只统计了其中一个操作,其他操作被去重过滤。解决的方法就是去重 Key 也加上 action。
  2. 接入层重试的情况,第一次请求处理失败上报了失败,然后接入层重试成功了上报了成——这样就会导致只统计了失败的,成功的上报被过滤了。解决的方法也是加上错误码。

3. 数据聚合:

3.1 目前 window 选择1分钟粒度聚合汇总

3.2 聚合 Key 根据业务需要进行选择

3.3 出库到 Kafka 时生成一个 uuid

uuid 是 java 自带的函数生成,相当于一个全局唯一随机数,好处就是有了唯一键,后面数据处理、入库时就很方便。

4. 输出到 Kafka:

目前 Flink 内置的支持事务的 Sink 只有 Kafka>0.11。当然可以根据 Flink 的 2PC (两阶段提交) 接口自行去实现需要后端的 Sink,比如 MySQL、PG。这里我们使用 Kafka 作为输出除了简单成熟,另外就是考虑到如果数据量增大,Kafka 这里的大数据能力就是天生的,数据库就需要扩容或替换——当然这里增加了一个 Kafka 到 PG 的同步的流程,流程变的更长了;但是考虑到后续数据量大和解耦的考虑,还是推荐出库到 Kafka。

下面是 Flink 2PC 的 Sink 要继承实现的接口:

public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> extends RichSinkFunction<IN>implements CheckpointedFunction, CheckpointListener {// 开始一个事务,返回事务信息的句柄protected abstract TXN beginTransaction() throws Exception;// 预提交(即提交请求)阶段protected abstract void preCommit(TXN transaction) throws Exception;// 正式提交阶段protected abstract void commit(TXN transaction);// 取消事务protected abstract void abort(TXN transaction);
}

4.1 打开 Kafka 事务出库

如上所述,Flink 的 Kafka 连接器在流计算 Oceanus (Flink) 平台已经支持,可以直接使用。这里是事务出库到 Kafka 的,那么后续读取 Kafka 这里的数据也要配置 read_commited 的级别的读,整个链路数据一致。

四. 问题

在上面的流程说明中已经就每步可能的问题进行了说明和讨论,但是肯定还有新的问题,就需要后续运营过程中发现进行修复。这里我们预料比较麻烦的问题是:
如果 2PC 事务过程中出现异常问题时,是否可以比较快、完美的恢复回来;否则可能出现死锁或启动不起来的情况。
以上是最近 2-3 个月的实现的情况,后面还会继续验证、继续发现问题,所以还是要进一步的学习和理解 Flink 的底层机制,甚至可以进行代码级的贡献——这一步肯定非常难,短时间不可能完成,初期投入很多但是产出不多,但是可以肯定的是值得长期投入。
本文作为 Flink 应用的一次尝试,如发现有错误请直接指出,同时欢迎有相同需求的同学一起讨论。

五. 参考链接

[1] Flink 实践教程:入门 1-零基础用户实现简单 Flink 任务:https://cloud.tencent.com/developer/article/1895677
[2] Flink 实践教程:入门9-Jar 作业开发:https://cloud.tencent.com/developer/article/1907822
[3] Flink 事务介绍:https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html

流计算 Oceanus 限量秒杀专享活动火爆进行中↓↓

腾讯云 AI 视觉产品基于流计算 Oceanus(Flink)的计费数据去重尝试相关推荐

  1. 指标统计:基于流计算 Oceanus(Flink) 实现实时 UVPV 统计

    作者:吴云涛,腾讯 CSIG 高级工程师 导语 | 最近梳理了一下如何用 Flink 来实现实时的 UV.PV 指标的统计,并和公司内微视部门的同事交流.然后针对该场景做了简化,并发现使用 Flink ...

  2. 实时监控:基于流计算 Oceanus ( Flink ) 实现系统和应用级实时监控

    作者:吴云涛,腾讯 CSIG 高级工程师 本文描述了如何使用腾讯云大数据组件来完成实时监控系统的设计和实现,通过实时采集并分析云服务器(CVM)及其 App 应用的 CPU和内存等资源消耗数据,以短信 ...

  3. 腾讯云AI应用产品总监王磊:AI 在传统产业的最佳实践

    欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~ 背景:5月23-24日,以"焕启"为主题的腾讯"云+未来"峰会在广州召开,广东省各级政府机构领导.海 ...

  4. 腾讯云AI应用产品总监王磊:AI 在传统产业的最佳实践 1

    欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~ 背景:5月23-24日,以"焕启"为主题的腾讯"云+未来"峰会在广州召开,广东省各级政府机构领导.海 ...

  5. 基于流计算 Oceanus(Flink) CDC 做好数据集成场景

    作者:黄龙,腾讯 CSIG 高级工程师 数据时代,企业对技术创新和服务水准的要求不断提高,数据已成为企业极其重要的资产.无论是在在企业数据中台的建设,亦或者是打造一站式数据开发和数据治理的PASS平台 ...

  6. 流计算 Oceanus | Flink JVM 内存超限的分析方法总结

    作者:董伟柯,腾讯 CSIG 高级工程师 问题背景 前段时间,某客户线上运行的大作业(并行度 200 左右)遇到了 TaskManager JVM 内存超限问题(实际内存用量 4.1G > 容器 ...

  7. 数据分析小结:使用流计算 Oceanus(Flink) SQL 作业进行数据类型转换

    作者:吴云涛,腾讯 CSIG 高级工程师 在这个数据爆炸的时代,企业做数据分析也面临着新的挑战, 如何能够更高效地做数据准备,从而缩短整个数据分析的周期,让数据更有时效性,增加数据的价值,就变得尤为重 ...

  8. 七大新品集中亮相,腾讯云AI大数据全线升级!

    近日腾讯云在北京举行大数据AI新品发布会.会上,腾讯云带来了在大数据与AI领域的最新研究成果,包括AI换脸甄别技术AntiFakes.腾讯星图以及企业画像平台等七大重磅新品,并对AI.大数据产品进行全 ...

  9. 最佳实践 | 用腾讯云AI语音识别零基础实现小程序语音输入法

    先回顾下,生活.工作中你使用过哪些语音识别相关的产品或者服务? 培训/考试相关的小程序,使用语音识别来判断回答是否正确: 英语口语练习的小程序,使用语音识别来打分: 你画我猜类的小程序,使用语音识别来 ...

最新文章

  1. 软件测试第4周小组作业:WordCount优化
  2. 计算机二级office函数日期,Excel函数-日期和文本函数-计算机二级Office
  3. python画柱状图-Python Excel 绘制柱形图
  4. linux环境下的TIME_WAIT和CLOSE_WAIT问题解决方法
  5. 面向对象chapter2
  6. C#实现关机的两种方法
  7. Unity web player-----a new version is required/insatall manually
  8. Java语言程序设计(基础篇)课后答案
  9. 国科大学习资料--模式识别与机器学习(黄庆明)--期末复习题1(含答案)
  10. dreamweaver cc php mysql_Dreamweaver cc 2015 +PHP+MySQL动态网站开发案例教程集合
  11. 计算机桌面任务栏过宽怎么处理,任务栏变宽了怎么办 还原变宽任务栏的方法【图文教程】...
  12. oracle条件索引查询,Oracle复合索引用于范围查询条件
  13. java的多态是什么意思_【Java】基础18:什么叫多态?
  14. 318. 最大单词长度乘积【我亦无他唯手熟尔】
  15. PPT VBA:批量转PDF
  16. Python通过高德地图API批量计算两地路线距离
  17. Figma又崩了,但我们P事没有
  18. logstash中无法解析nginx日志中的\x09类似字符导致服务停止
  19. QT 如何实现QLabel的点击事件
  20. Docker容器之Docker Swarm集群详解(上)

热门文章

  1. 二维码(QR code)基本结构及生成原理
  2. Uniswap再次让对手望尘莫及?关于V3的一些猜想
  3. oracle获取减一年,oracle日期减一年 oracle指定日期减一天
  4. Bandit总结3详细版
  5. 计算机桌面最小化后找不到,微博桌面最小化后找不到图标了怎么办??
  6. 敬业福终极攻略:马云的福字和这些图可多扫十几张福卡
  7. export ‘Switch‘ (imported as ‘Switch‘) was not found in ‘react-rou ter-dom‘ (possible exports:
  8. 一文读透GO语言的通道
  9. elf2flt 解释
  10. 计算机无法卸载软件,电脑软件无法卸载怎么解决