1 背景

互联网流量应用的一个重要方面是计算各页面、入口的转化,深度洞察用户行为轨迹,数据驱动精细化运营,对一些大公司来说也是如此。如何建设公司级统一口径转化导流分析数据,满足多个业务对转化数据的需求,为公司各业务部门提供转化分析数据(各入口或者页面带来的商家、活动页、预定、订单),基于此我们设计了转化归因来满足业务需求。

归因又叫挂单:用户打开某一APP,在浏览N个页面之后,遇到有下单意向的页面或订单页面;为了计算流量统计中的转化率,需要标记用户流量的行为路径;并将有意向的页面或订单页面绑定到路径的相应页面或事件上,这一过程称为归因。(有意向的页面:商家、活动页,预订页,订单页等)

2 挑战

  1. 通用归因模型:如何设计一套满足公司复杂业务场景的归因转化数据方案?我们采用了基于最短路径的归因模型,即每次来一个节点其功劳都认为是其最短路径产生的。Google Analytics中的归因模型:https://support.google.com/analytics/answer/1662518?hl=zh-Hans
  2. 归因逻辑复杂:用户路径数据复杂,如何兼容用户异常数据?例如(1)点击事件时间戳晚于下一页面的pv;(2) 当前页面上的点击事件时间戳早于当前页面pv;(3) 非首页pv事件丢失;(4) 首页pv丢失。
  3. 归因数据量大:流量数据量大,归因上游350亿条以上的数据量,3万以上的文件个数据,磁盘上ORC文件大小上T,如何去处理这么大的数据量?
  4. 归因算法设计:百亿用户行为数据如何快速计算用户最短路径,如何以节省内存的方式去做归因计算?

3 归因模型设计和复杂逻辑兼容

3.2 归因模型设计

满足一些公司各个业务通用归因模型:能够满足页面粒度、模块粒度的订单及其下单转化率计算,能满足页面或APP带来的总共订单数、总交易额的计算,能够满足各业务转化导流情况的计算等

得到用户行为路径:按每个用户上报数据的时间戳串联得到用户行为路径;

得到最短路径:从前向后遍历,新来元素要插入链表最后,插入前先从后往前查该链表有没有出现过新加入的元素,如果首次查到则停止,删除首次查到的元素到链表尾之间的所有元素,加入新元素。(即新来的元素和前面的元素构成环,则将环删除),这个过程也叫回退删除;

归因挂单:遇到订单页面(下单的页面)或有意向的页面(商户页,预订页等),把该页面挂到包含最短路径节点(用户的每条数据,可以叫一个节点)的用户行为路径节点上,输出整个用户行为路径。

e.g.回退删除:A——>B——>C——>D——>B; 新来的B在路径中出现过()的位置:A——>(B)——>C——>D——>B;此时删除中括号之间的元素:A——>[B——>C——>D]——>B,即删除了环[B——>C——>D——>B)注意左闭右开

则行为路径中的每个页面浏览或点击行为结点都是接触点,在不同的归因模型下,用户此次订单转化行为会被归因到行为路径中一个或多个接触点上。

3.2 归因复杂逻辑处理

此处不做详细介绍:主要处理首次打开非首页,交叉回退,重复下单,连续下单,僵尸节点处理,点击事件晚于下一个页面PV,非首页PV事件丢失,首页PV丢失等

4 工程实现和优化

基于Spark大数据生态体系,实现归因模型,Application基于Yarn的资源调度模式。

下面主要从算法优化和Spark任务优化来介绍工程实现和优化实践(如下调优部分来自我的CSDN博客:Spark处理百亿规模数据优化实战)。

4.1 归因算法

假设每条用户行为数据有90个字段,根据归因模型我们很容易想到把每条用户行为数据扩展到100个字段,其中新增的10个字段用来存放归因过程中归因的信息;
如下图所示模拟的归因过程:
1) 聚合排序 把同一个用户的行为数据聚合在一起,根据时间戳从小到大排序。
2) 求最短路径 维护一个list,采用回退删除的方式,计算用户的最短路径,即每来一个节点看看在list中有没有出现,没出现加入到list,否则如下图所示迭代到第二个B时,发现其在路径A——>[B——>C——>D]——>B中出现,就删除中括号的内容,保留A——>B(第二个)为最短路径。
3) 迭代归因 如果遇到有意向的页面(订单、商家、活动页)如下图中的“O”时触发归因,用HashMap维护截止当前的用户真实路径A——>B——>C——>D——>B——>O,基于最短路径list去修改HashMap中A——>B(第二个)——>O,用O这条记录中的相关信息修改对应记录扩充的10个字段。然后依次迭代接下来的路径,每来一个触点求最短路径,遇到归因触发条件基于最短路径归因。

基于上图算法模型,做了相关测试Spark资源限制9.4T(600 * 16g)~ 12.5T(800 * 16g),MR任务资源限制11.7T(1500 * 8g)两个任务生产耗时如下表

归因数据条数量级大概在50亿~100亿条(受节假日影响大),采用如上算法模型,Hadoop和spark生产耗时太长,很难满足用户需求。

分析算法模型给出了优化方案
核心理念:归因过程数据膨胀,如何以节省内存的方式去归因,是算法优化的关键。
核心思想:数学分而治之思想+索引技术灵活运用。
算法描述:不变的字段单独维护且只维护一份,供查询使用(如Array1),归因新增的字段单独封装维护在(Array2),轻量级的Array2参与归因的计算过程,归因完成,通过Array1和Array2之间的索引把数据打通落盘。

资源限制在9.4T~12.5T且数据条数大概是60亿条时,算法优化前后时效性对比如下图所示。

4.2 Spark任务优化

本节主要从内存调优、高性能算子、数据结构优化、广播大变量和小表调优、动态并行度调优、Spark文件切分策略调优来介绍Spark处理大规模数据的一些优化实践。

4.2.1 内存调优

由于归因数据量大且会发生数据膨胀,如果内存参数设置不合理,任务容易出现OOM,分析Spark1.6.2内存管理模型如下图所示,知道Spark如何管理自己的内存我们才能进行更好的调优。

归因任务内存参数配置:

spark.driver.memory=6g(存在广播所以Driver设置的较大)
spark.executor.memory=13G(因为特殊情况,这个参数最大是13G)
spark.memory.fraction=0.4

内存参数配置计算公式:
Execution Memory 2.5G =(Heap size(13G)- Reserve Memory(450M))* spark.memory.fraction 0.4 * spark.memory.storageFraction 0.5

用户主导的空间:User Memory 7.5G =(Heap size(13G)- Reserve Memory(450M))* (1 - spark.memory.fraction 0.4)

安全因子:0.9,考虑到内存空间使用和预估的准确度,实际应用过程中会考虑加入一个安全因子。
可用用户主导空间:User Memory * 0.9 = 6.8G(根据实际情况,数据条数 * 每条数据归因后占用内存最大值,基于此评估一个最大值,如果超过这个值就会出现OOM)

效果:解决程序OOM问题,因为归因过程维护了大的数据结构,其主要使用了User Memory的空间,用Spark默认内存配置会导致用户空间OOM。

4.2.2 高性能算子

归因分析的是同一个用户的行为数据,分布式处理需要把一个用户的数据抓取到一个节点上处理,有Shuffle操作,同源数据采用groupByKey时Shuffle Write数据量3.5T,aggregateByKey时Shuffle Write数据量3T,相比节省时间2~3min。

分析数据分布的特征,同一个设备的数据一般在一个文件出现的概率较大,将groupByKey算子改成 aggregateByKey,首先进行了一个Map端的聚合,减少了网络传输的数据量。

模拟代码:

val initialSeq = mutable.Seq.empty[Row]
val addToSeq = (s: mutable.Seq[Row], v: Row) => s :+ v // Map端本地聚合
val mergePartitionSeqs = (p1: mutable.Seq[Row], p2: mutable.Seq[Row]) => p1 ++ p2 // Reduce端聚合
kv.aggregateByKey(initialSeq)(addToSeq, mergePartitionSeqs)

效果:减少网络传输数据量,时效性提升了2~5min,降低网络异常导致任务失败的风险。

4.2.3 数据结构优化

归因代码中,采用更加节省内存的数据结构,例如聚合的key、最短路径中的索引(如下模拟代码所示)等多处采用字符串拼接实现,避免自定义对象封装数据,尽可能使用轻量的Array而不是HashMap等

// PageId为空加入到最短路径 不进行回退和归因
if (flagReverseIntent && isBlankPageId) {
pageIdEventStepCategory = eventStep.toString + "_" + eventCategory
shortPath.append(pageIdEventStepCategory)
}

效果:节省内存。

4.2.4 广播大变量和小表调优

归因任务为什么需要广播?我们先看一下广播的原理,如下图Executor端用到了Driver的List,如果广播List则每个Executor中只有一份Driver端的变量副本。如果不广播List,Executor有多少task就有多少Driver端的变量副本。如果对小表广播能实现本地Join,避免sortMergeJoin(如果使用SparkSQL发现不广播可以加上这个参数:spark.sql.statistics.fallBackToHdfs=true)。

归因过程需要关联一些小的维表或定义一些大的变量,并存在大量task,所以需要广播。
效果(1)降低网络传输的数据量;(2)降低内存的使用;(3)加快程序的运行速度。

通过如上四种Spark任务优化,使任务运行更加稳定,同时也节省了内存。

注意:采用SQL对分区表进行广播,会遇到整个表比较大(超过广播限制的大小10M),但是分区下数量并不大,导致SQL里无法广播,此时需要开启一个参数便可广播(spark.sql.statistics.fallBackToHdfs=true)

4.2.5 动态并行度调优

流量数据节假日数据量明显增大,是正常值的1~2倍,为了保障数据稳定生产,归因链条包括“数据清洗任务”和“归因任务”,“数据清洗任务”主要是从300亿+条数据清洗出归因需要的50亿~100亿数据,并且把上游上万个大小不同的文件合并成固定个数和大小,这个任务产出的文件个数和大小对“归因任务”是有影响的,如何确定这个文件大小和个数?

在数据量相同且资源配置相同条件下,要保证归因在1h内完成,测试单Task处理不同文件大小或不同数据条数的执行情况:

(1)处理文件大小200M~256M左右,每个Task处理条数大概60万左右,Task平均执行时长8~10min;如图:每个Task处理文件大小为247M,其中75%的Task执行时长为10min,而且max值为17min,就算max失败,Task失败重新执行也不会影响到任务整体结束时间。

(2)处理文件大小256~285M左右,每个Task处理条数大概70万左右,Task平均执行时长12min左右;如图:每个Task处理文件大小为271M,其中75%的Task执行时长为12min,而且max值20min,Task失败重新执行对任务整体时效性有影响。

(3)处理文件大小300M~350M时,每个Task处理数据条数80万左右,Task平均执行时长17min左右。如图:每个Task处理文件大小313M,其中75%的Task执行时长为17min,而且max值太大,这样就拖慢了整个运行过程。

通过大量测试发现,Task平均执行时长8~10min,每个Task处理条数大概60万左右时归因任务遇到失败Task、慢节点、某台机器故障等在开启慢任务推测情况下表现较优。

慢任务推测参数配置:

spark.speculation=true
spark.speculation.interval=60s
spark.speculation.multiplier=1.3
spark.speculation.quantile=0.99

基于如上测试,对“数据清洗任务”产出文件数量进行动态调整,让文件大小尽量在200M~256M左右,文件数量和大小,可以根据历史数据条数、文件个数、每个文件大小,考虑节假日等情况去预估,当然也可以采用机器学习等算法去预测。比如简单计算:本周日文件个数 = 上周日数据总条数/60万 ,因为数据清洗任务后数据量大概是50亿~100亿条,所以文件个数阈值为[7000,16000]

依据每个Core处理2~3个Task,每个Task处理60万条数据文件大小为200~256M表现较好,开启了Executor动态资源分配功能如下:

spark.dynamicAllocation.minExecutors=1000
spark.dynamicAllocation.maxExecutors=1500

参数:spark.default.parallelism是控制Shuffle并行度的,从而会影响Spark Task个数,间接影响文件产出个数。
“数据清洗任务”是一个离线按天执行的任务,通过动态调整spark.default.parallelism的值保证产出文件个数和大小。
“归因任务”是一个离线按天执行的任务,通过动态调整spark.default.parallelism的值,进一步保证归因过程每个Task处理的文件维持在256M左右,数据条数维持在60万左右。

效果:避免了节假日任务执行超时/任务失败,保证生产耗时的相对平稳。

通过如上参数调整,提高并缩短了生产耗时稳定性,主要是扩大Executor个数(资源才是王道),生产耗时缩短到了31min左右,如下图所示

4.2.6 Spark文件切分策略调优

归因任务上游的“数据清洗任务”,会清洗出归因需要的有效数据,并且对上游上万个小文件进行合并压缩成ORC文件,其文件大小在256M左右。

1)归因任务遇到问题:作业提交后ApplicationMaster(Driver)启动了,Spark任务长时间占用资源,SparkUI看不到DAG图、Stage、Partition和Task相关的信息。
1)问题分析:Driver启动,但是Executor没干活,说明问题出在了Driver,Driver干什么呢?定位到Driver在计算Partition,发生了Full GC,于是问题定位到了Spark读取文件的方法OrcInputFormat.java。
1)通俗描述:老大(Driver)管理小弟(worker)干活,本来是老大把活分给小弟就可以了,但是老大一直在了解小弟的情况,自己很忙小弟很闲。
1)问题跟踪:查看OrcInputFormat.java发现Spark读取ORC文件有三种策略,默认采用HYBRID策略(HiveConf.java有相关配置信息):Spark Driver启动的时候,会去nameNode读取元数据,根据文件总大小和文件个数计算一个文件的平均大小,如果这个平均值大于默认256M的时候就会触发ETL策略。ETL策略就会去DataNode上读取orc文件的head等信息,如果stripe个数多或元数据信息太大就会导致Driver 产生FUll GC,这个时候就会表现为Driver启动到Task执行间隔时间太久的现象。
1)解决方案:控制文件大小为256M左右,改变文件切分策略为BI,控制stripe大小。

// 创建一个支持Hive的SparkSession
val sparkSession = SparkSession
.builder()
.appName("PvMvToBase")
// 默认64M,即代表在压缩前数据量累计到64M就会产生一个stripe。与之对应的hive.exec.orc.default.row.index.stride=10000可以控制有多少行是产生一个stripe。
// 调整这个参数可控制单个文件中stripe的个数,不配置单个文件stripe过多,影响下游使用,如果配置了ETL切分策略或启发式触发了ETL切分策略,就会使得Driver读取DataNode元数据太大,进而导致频繁GC,使得计算Partition的时间太长难以接受。
.config("hive.exec.orc.default.stripe.size", 268435456L)
// 总共有三种策略{"HYBRID", "BI", "ETL"}), 默认是"HYBRID","This is not a user level config. BI strategy is used when the requirement is to spend less time in split generation as opposed to query execution (split generation does not read or cache file footers). ETL strategy is used when spending little more time in split generation is acceptable (split generation reads and caches file footers). HYBRID chooses between the above strategies based on heuristics."),
// 如果不配置,当orc文件大小大于spark框架估算的平均值256M时,会触发ETL策略,导致Driver读取DataNode数据切分split花费大量的时间。
.config("hive.exec.orc.split.strategy", "BI")
.enableHiveSupport()
.getOrCreate()

调整这个参数可控制单个文件中stripe的个数,不配置单个文件stripe过多,影响下游使用,如果配置了ETL切分策略或启发式触发了ETL切分策略,就会使得Driver读取DataNode元数据太大,进而导致频繁GC,使得计算Partition的时间太长难以接受。

问题根源Driver压力太大,Worker启动了也只能闲等Driver忙完了,进行分配调度。

效果:提升了任务稳定性。

5 作者简介

专注于大数据技术(Spark、Hadoop)、数据仓库技术、数据分析应用等领域。联系邮箱: aijiudu@163.com

6 引用
https://blog.csdn.net/aijiudu/article/details/72353510
https://blog.csdn.net/aijiudu/article/details/78616064
https://blog.csdn.net/aijiudu/article/details/55811464
https://blog.csdn.net/aijiudu/article/details/78032663

百亿条数据复杂业务场景下通用归因模型设计实现相关推荐

  1. Python对百亿条(100GB)数据进行查重

    前言 在我们处理大数据过程中,总会有找出重复数据的情况,上百亿条数据中找出重复的数据,看起来比登天还难,但Python告诉你几行代码轻松搞定这一切!不说废话,上干货! 生成72个BIN文件,每个BIN ...

  2. 搜索推荐业务场景下的特征系统搭建

    转载:https://zhuanlan.zhihu.com/p/79874983?utm_source=wechat_session 前提:前阵子受朋友的邀约,结合自己在推荐搜索系统下的经验,对企业级 ...

  3. 7月第2周业务风控关注 |涉嫌侵犯数百亿条公民个人信息 上市公司数据堂被公安一锅端...

    易盾业务风控周报每周呈报值得关注的安全技术和事件,包括但不限于内容安全.移动安全.业务安全和网络安全,帮助企业提高警惕,规避这些似小实大.影响业务健康发展的安全风险. 1.涉嫌侵犯数百亿条公民个人信息 ...

  4. 12、分布式搜索引擎在几十亿数据量级的场景下如何优化查询性能?

    1.面试题 es在数据量很大的情况下(数十亿级别)如何提高查询效率啊? 2.面试官心里分析 问这个问题,是肯定的,说白了,就是看你有没有实际干过es,因为啥?es说白了其实性能并没有你想象中那么好的. ...

  5. 每天近百亿条用户数据,携程大数据高并发应用架构涅槃

    互联网二次革命的移动互联网时代,如何吸引用户.留住用户并深入挖掘用户价值,在激烈的竞争中脱颖而出,是各大电商的重要课题.通过各类大数据对用户进行研究,以数据驱动产品是解决这个课题的主要手段,携程的大数 ...

  6. MongoDB在58同城百亿量级数据下的应用实践

    58同城作为中国最大的生活服务平台,涵盖了房产.招聘.二手.二手车.黄页等核心业务.58同城发展之初,大规模使用关系型数据库(SQL Server.MySQL等),随着业务扩展速度增加,数据量和并发量 ...

  7. 神策数据成林松:数据智能在业务场景下的应用(附 PPT 下载)

     在神策 2020 数据驱动用户大会「上海站」现场,神策数据业务咨询师成林松分享了<数据智能在业务场景下的应用>的演讲.(文末附 PPT 下载地址) 本文根据其演讲内容整理,数据均为虚拟. ...

  8. 4000GB、数百亿条个人信息泄露!大数据行业知名企业数据堂被查

    个人信息安全对我们每个人来说都很重要,但却面临着巨大的威胁. 知名大数据企业涉嫌侵犯数百亿条公民个人信息 7 月 8 日,据新华网"新华视点"报道,山东日前成功破获一起特大侵犯公民 ...

  9. 面对百亿用户数据,日均亿次请求,携程应用架构如何涅槃?

    https://www.infoq.cn/article/ctrip-big-data-high-concurrency-applications-architecture?utm_campaign= ...

最新文章

  1. FC8下安装mplayer
  2. LeetCode-剑指 Offer 28. 对称的二叉树
  3. CodeForces - 1307D Cow and Fields(最短路+思维)
  4. mysql中ifnull和hive中if函数的转换
  5. 经常遇到的10大C语言基础算法(珍藏版源码)
  6. 2018-2019-2 《Java程序设计》第6周学习总结
  7. C#中的CultureInfo类
  8. linux nfs spec,创建 NFS Ubuntu Linux 服务器卷 - Azure Kubernetes Service | Microsoft Docs
  9. iOS UISearchBar 在界面跳转时出现灰色背景问题
  10. java inflater_Android下LayoutInflater的正确使用姿势
  11. python 驱动级鼠标_电竞极速鼠标
  12. vb.net 教程 2-13 Windows API 函数
  13. 单片机原理及接口技术--8051汇编语言学习(LED流水灯实验)
  14. 接口测试平台代码实现16:吐槽功能后台实现+orm初识
  15. Vue-----table 控件自动勾选全选框2 与tab控件组合使用
  16. Js根据拼音首字母分组
  17. Bose700降噪体验
  18. android 对象 保存,Android使用SharedPreferences保存对象
  19. 从18路诸侯讨董卓,谈如何对抗51%攻击
  20. python通过关键字搜索淘宝商品详细信息

热门文章

  1. IOS14怎么降级回IOS13
  2. 数据结构(24考研复习版)
  3. 微信分享的链接,手机打开白屏,单页面应用(Hash模式下),实现微信分享
  4. 5G 700M频段采用的双工模式是
  5. 密信电子签名服务启用新域名、新品牌、新产品和新网站
  6. vb.net 使用A star
  7. 萌新推荐初学者常用软件、网站资源。
  8. 使命召唤手游怎么显示服务器无响应,《使命召唤手游》无法进入怎么办 无法进入解决教程...
  9. 某软件公司全国各大学招聘实录
  10. linux node 命令无效,完美解决linux下node.js全局模块找不到的情况