精选30+云产品,助力企业轻松上云!>>>

大家好,我是来自聚美优品刷宝大数据部门的吴攀刚,本文将跟大家分享 PyFlink 在刷宝的应用,包括:背景介绍、架构演进、技术选型以及一个问题的解决思路分享。
刷宝是一款短视频 APP,涵盖短视频、直播视频等内容,为用户提供快乐视频和优质的主播。在 来到聚美之前,我主要做离线数仓开发和数据开发,来到刷宝之后,部门也并没有现成的实时框架,需要自行搭建。 所以,当实时的需求来到我面前的时候,内心是忐忑的。
下面我将分享下,我与 PyFlink 的缘分。

1.背景介绍

业务场景
刷宝有许多重要的业务场景,其中之一是为用户实时推荐短视频。其中 推 荐的实时性,决定了用户在视频上的停留时长、观看视频时长、留存等指标,进而影响到广告位的收益,比如广告的单价等。
刷宝从 2019 年开始,业务飞速发展,截止到 2020 年 5 月份,用户行为数据峰值每秒过百万,每天有 200 亿数据。这个业务量,对我们现有的技术架构、数据计算的实时性提出了挑战。

实时化挑战

我们的数据流程 整个环节完成需要1小时左右时间,远达不到实时的要求。如何更快速的根据用户浏览习惯实时推荐相关视频 会对用户观看视频时长、停留时长、留存等有重大的影响,比如在现有基础上提升10-20%。
我们更期望数据的计算实时化,也就是将原有技术架构中的批量计算(hive)变成实时计算(Flink SQL),架构图如下。

2.架构演进

架构演进

  • 第一层:最开始是离线计算,完成一次计算需要30分钟,还不包括后续的模型处理;

  • 第二层:考虑实时计算后,我们打算采取 Flink 架构来处理,整体主件过程如图;

  • 第三层:考虑到人力和时间等成本,还有技术人员技能匹配度,最终选择第三层;

我们成员更多的是对 Python 和 SQL 熟悉,所以 PyFlink 更加适合我们。我们用  PyFlink 开发了 20 个业务作业,目前每秒 过百万,每天有 200 亿,业务平稳运行(P yF link 1.10)。

3.技术选型

面对实时化的业务和架构升级需求,我们团队本身没有 Spark、Flink 等框架的背景积累,但是一个偶然的机会,我们观看了金竹老师的直播,了解到了 PyFlink 是 Flink 的 Python API 和我团队现有的开发人员语言技能比较吻合。所以就想利用 PyFlink 进行业务的实时化升级。

看完金竹老师的分享,我对 PyFlink 有了一个简单的了解,就和团队同学一起规划了解 PyFlink,进行技术选型。
初识与困难
虽然 PyFlink 和团队的语言技能比较 match,但是其中还是涉及到很多 Flink 的环境、文档、算子等的使用问题,遇到了很多困难:

  • PyFlink 的知识文档、示例、答疑等都非常少,除了官网和阿里云,基本无其他参考。

  • PyFlink 官方文档缺少很多细节,比如:给了方法不给参数格式。

  • PyFlink 的内容不明确,官网上没有明确具体写出哪些 PyFlink 没有,哪些有。没法将 Flink 和 PyFlink 清晰的区分开。

  • PyFlink 本身等局限性,比如:left/rigint Join 产生 retraction 无法写入 Kafka,要写入需要改写 Flink SQL 让流改为 append 模式,或者修改 kafka-connector 源码支持 retraction。

所以一时感觉利用 PyFlink 的学习时间也比较漫长。大家比较担心短时间内很难满足业务开发。   机遇
在我和团队担心开发进度时候,我也一直关注 Flink 社区的动态,恰巧发现 Flink 社区在进行 “PyFlink 扶持计划”,所以我和团队都眼前一亮,填写了 PyFlink 调查问卷。也和金竹老师进行了几次邮件沟通。最终有幸参与了 PyFlink 社区扶持计划。

4. OOM 报错解决思路分享

其实了解下来 PyFlink 的开发是非常便捷的,在完成了第一个作业的开发之后,大家逐渐熟悉 PyFlink 的使用,3周左右就完成了 20 个业务逻辑的开发,进入了测试阶段。这个快速一方面是团队成员不断的熟悉 PyFlink,一方面是由社区 PyFlink 团队金竹/付典等老师的帮助和支持。这里,不一一为大家分享全部内容,我这里列举一个具体的例子。
■ 背景:
从接触到 Flink 开始,有个别 job,一直有 running beyond physical memory limits 问题。多次调整 tm 内存,修改 tm 和 slos 的比例,都没用,最终还是会挂。最后妥协的方案是,增加自动重启次数,定期重启任务
■ 现象:
Flink job 通常会稳定运行5-6天,然后就报出这个错误。一直持续和反复。
■ 详细信息:

Closing TaskExecutor connection container_e36_1586139242205_122975_01_000011 because: Container [pid=45659,containerID=container_e36_1586139242205_122975_01_000011] is running beyond physical memory limits. Current usage: 4.0 GB of 4 GB physical memory used; 5.8 GB of 32 GB virtual memory used. Killing container.    Dump of the process-tree for container_e36_1586139242205_122975_01_000011 :    |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE    |- 45659 45657 45659 45659 (bash) 0 0 115814400 297 /bin/bash -c /usr/local/jdk//bin/java -Xms2764m -Xmx2764m -XX:MaxDirectMemorySize=1332m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/test.bin -Dlog.file=/data/emr/yarn/logs/application_1586139242205_122975/container_e36_1586139242205_122975_01_000011/taskmanager.log -Dlogback.configurationFile=file:./logback.xml -Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1> /data/emr/yarn/logs/application_1586139242205_122975/container_e36_1586139242205_122975_01_000011/taskmanager.out 2> /data/emr/yarn/logs/application_1586139242205_122975/container_e36_1586139242205_122975_01_000011/taskmanager.err     |- 45705 45659 45659 45659 (java) 13117928 609539 6161567744 1048471 /usr/local/jdk//bin/java -Xms2764m -Xmx2764m -XX:MaxDirectMemorySize=1332m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/test.bin -Dlog.file=/data/emr/yarn/logs/application_1586139242205_122975/container_e36_1586139242205_122975_01_000011/taskmanager.log -Dlogback.configurationFile=file:./logback.xml -Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner --configDir .    Container killed on request. Exit code is 143    Container exited with a non-zero exit code 143

我们的解决思路:
        1. 从内容上看是 oom 问题,所以一开始调整了 tm 大小,直接到最大内存,2调整 tm 和 slot 的比例,尽量做到 1v1.         2. dump heap 的内存,分析占用情况。         3. 调整 backend state 的类型
结果:以上手段都失败了,在持续一段时间后,依然一定报错。
PyFlink 团队处理思路:
1.分析当前作业的 state 情况,作业情况,作业环境参数情况 。通过 flink-conf 可以看 backend state 情况,通过 flinkdashboard 可以知道作业图和环境参数。   2. 由于 1.10 中,rocksdb statebackend 占用的内存默认为非 managed memory,通过在 PyFlink 作业中增加如下代码,可以将其设置为 managed memory:env.get_state_backend()._j_rocks_db_state_backend.getMemoryConfiguration().setUseManagedMemory(True)   3. 为了分析 OOM 是否是由于 rocksdb statebackend 占用的内存持续增长导致的,开启了关于 rocksdb 的监控,因为我们使用的是 rocksdb,这里需要在 flink-conf 中增加如下配置:

state.backend.rocksdb.metrics.block-cache-capacity: truestate.backend.rocksdb.metrics.block-cache-usage: true                            state.backend.rocksdb.metrics.num-running-compactions: truestate.backend.rocksdb.metrics.num-running-flushes: truestate.backend.rocksdb.metrics.size-all-mem-tables: true

然后通过自建的 metrics 系统来收集展示和分析,我们使用的 grafana。   4. 通过前面的步骤,观察到 rocksdb 的内存基本是稳定的,内存占用符合预期,怀疑是“rocksdb 超用了一点点,或者是 jvm overhead 不够大”导致的。这两种问题,都可以通过调整 jvm overhead 的相关参数来解决。于是在 flink-conf 中添加了配置:

taskmanager.memory.jvm-overhead.min: 1024mtaskmanager.memory.jvm-overhead.max: 2048m

用大佬的原话:rocksdb 超用了一点点,或者是 jvm overhead 不够大,这两种情况调大 jvm overhead 应该都能解决。   5. 调整 flink.size 的大小,让 flink 自动计算出 process.size,这部分在 flink-conf:

taskmanager.memory.flink.size: 1024m

完成所有调整后,经历了14天的等待,job 运行正常,这里充分说明了问题被解决了。同时开始观察 rocksdb 的 metrics 情况,发现 native 内存会超用一些,但是 rocksdb 整体保持稳定的。目前能判断出某个地方用到的 native 内存比 flink 预留的多,大概率是用户代码或者第三方依赖,所以加大下 jvm-overhead 大数值,能解决问题。   6. 最终需要修改的参数有:
1) 在 pyflink 作业中增加如下代码:

env.get_state_backend()._j_rocks_db_state_backend.getMemoryConfiguration().setUseManagedMemory(True)

2) flink-conf 修改或增加:

taskmanager.memory.jvm-overhead.min: 1024mtaskmanager.memory.jvm-overhead.max: 2048mtaskmanager.memory.process.size: 6144m

其实针对这个业务升级,老板为了不影响最终的业务上线,起初我们准备了2套方案同时进行:

  • 基于某个云平台进行平台搭建和开发;

  • 基于开源 PyFlink 进行代码开发;

两个方案同时进行,最终我们团队基于 PyFlink 开发快速的完成了业务开发和测试。最终达到了我前面所说的每秒百万/每天200亿的稳定业务支撑。
重点,重点,重点,参与这个业务升级的开发只有2个人。

5.总结和展望

通过 PyFlink 的学习,刷宝大数据团队,在短时间能有了实时数据开发的能力。目前稳定运行了 20+PyFlink 任务,我们对接了多个需求部门,如推荐部门、运营、广告等;在多种场景下,模型画像计算、AB 测试系统、广告推荐、用户召回系统等,使用了 PyFlink。为我们的业务提供了坚实稳定的实时数据。
此外,我们将搭建 Flink on Zeppelin 这样的实时计算平台,扩大 Flink 开发用户群体,进一步简化 Flink 开发成本。Flink 1.11 版本也准备上线,Python UDF 功能会有进一步的优化,Pandas 模块也会被引入。假如读者和我们一样,期望能快速拥有实时的能力,以 Python 语言为主,并且还有数据开发/数仓的能力,PyFlink 将是不二之选。

如果您也对 PyFlink 社区扶持计划感兴趣,可以填写下方问卷,与我们一起共建 PyFlink 生态。

  • 了解详情: PyFlink 社区扶持计划正式上线
  • 参与方式: 点击「 阅读原文 」填写问卷即有机会加入哦

专注大数据技术、架构、实战

关注我,带你不同角度看数据架构

在看」吗?

本文分享自微信公众号 - 大数据每日哔哔(bb-bigdata)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

PyFlink 在聚美优品的应用实践相关推荐

  1. 【干货下载】聚美优品、中商惠民、倍全,如何让数据驱动“新零售”落地?...

    本文提炼自神策数据创始人 & CEO 桑文锋在第五届中国全渠道峰会上发表的主题为<大数据驱动线上线下场景融合>的演讲.PPT 干货获取请点击文末"阅读原文". ...

  2. 聚美优品 html 资源,跨界整合行业资源 聚美优品向阳而生

    2010年,聚美优品横空出世,凭借着首创"化妆品团购"模式迅速占领了市场并赢得了消费者的认可.一年之后也就是2011年,聚美优品注册用户超过了100万,并且销售额突破了5亿.201 ...

  3. 聚美优品范忱:我是如何将用户推荐准确率提升 10% 的?

    传统的商业模式如今已在悄无声息中被"革命",零售业的出路不仅是线上线下的成功融合,更源于对用户数据的收集.整理.分析,深度洞察用户需求,实现可预测.可指导,从而驱动业务发展与科学决 ...

  4. 资本寒冬之下的聚美优品私有化

    随着最终协议的签订,聚美优品私有化终于要尘埃落定. 2月26日,在隔夜美股再次暴跌,道指狂跌近900点,史上首次连续两日暴跌超800点的背景下,此前一路下挫的聚美优品,股价却得以逆势上涨26%,报19 ...

  5. 聚美优品API 根据关键词取商品列表 Onebound电商平台数据

    万邦聚美优品根据关键词取商品列表 API 返回值说明 item_search-根据关键词取商品列表   API test onebound.jumei.item_search 公共参数 请求地址: 跨 ...

  6. 聚美优品启动上市,真的那么赚钱吗?

    新一轮中概股密集上市窗口看来已是全开.京东之后,又一家电商公司正式提交ipo申请--是聚美优品,看起来在经历过去3年间跌宕沉浮之后,电商的ipo战还是主拼运营效率. 主营业务5800美元的利润,带着中 ...

  7. 电商战决胜在物流 聚美优品破瓶颈发展

    随着"红六月"电商价格战的"高潮"逐渐退去,广大网购一族们又开始沉浸在漫长的等待收货过程中.不可否认,与商场购物相比,网络购物虽然提供了便利和实惠,但消费者需要 ...

  8. 爆聚美优品售假货,中国老龄商城有话说

    近日,聚美优品因其"301大促"着实火了一把,但连带着的"假货"质疑声也将其推上风口浪尖,甚至有网友将其调侃成"巨没有品"."官方 ...

  9. 聚美优品店庆nbsp;从天堂到地狱

    2019独角兽企业重金招聘Python工程师标准>>> 编者按:今天上班,无意中看到好123导航网站上的"聚美优品"那几个字加上了"热"的标签 ...

最新文章

  1. 面试官:为什么要尽量避免使用 IN 和 NOT IN 呢?
  2. android 数据库表格数据库数据库中,Android SQLite数据库中的表详解
  3. IntellIJ IDEA 配置 Maven 以及 修改 默认 Repository
  4. 算法-排序-k排序(算法导论第三版第八章思考题8-5)
  5. 混合云异军突起 英特尔的全“芯”体验为企业保驾护航
  6. ppt2003 未找到 mathpagewll 53_steam每日特惠:《伊拉图斯:死之主》今日史低53元
  7. java如何实现不固定个数传参
  8. xshell连接成功但无法输入命令_如何解决cisco设备无法进入系统问题?
  9. python圆形按钮_小白用python写个贪吃蛇给小白看
  10. 爬取豆瓣电影储存到数据库MONGDB中以及反反爬虫
  11. 拓端tecdat|R语言有限混合模型聚类FMM、广义线性回归模型GLM混合应用分析威士忌市场和研究专利申请、支出数据
  12. Ubuntu18.04(Gnome桌面)主题美化,Mac私人定制
  13. MQTT、CoAP 还是 LwM2M?主流物联网协议如何选择
  14. SEEK学习论坛-JavaWeb开发实训课题 (数据库MySQL+js+Ajax+Servlet)代码分享篇
  15. 【答读者问8】backtrader中多股票回测时停牌等缺失数据的处理方法
  16. 解决SQLServer2000安装被挂起的方法
  17. mysql数据查询(DQL)
  18. mysql 导入 没有数据库文件怎么打开_没有SQL Server数据库时如何打开.MDF文件
  19. C++ Error C2280 尝试引用已删除的函数
  20. python(14)--集合

热门文章

  1. webrtc后续补丁
  2. 如何从程序员转型为项目经理
  3. (转)使用异步Python 3.6和Redis编写快速应用程序
  4. iOS25个性能优化,和内存优化
  5. Python 实现网络爬虫小程序
  6. 2013.3.9学习日记
  7. Node.js框架对比:Express/Koa/Hapi
  8. 容器编排技术 -- 使用Minikube 部署 Kubernetes 集群
  9. Nacos支持三种部署模式
  10. Spring @Autowired Annotation教程