Sparksql在处理一些具体的业务场景的时候,可以通过算子操作,或者RDD之间的转换来完成负责业务的数据处理,在日常做需求的时候,整理出来一下几个经典的业务场景的解决方案,供大家参考。

1、取商家任务(task=1,2,3)全部完成的最早时间(注意如果任务3没有完成,则表中无3的数据,这种情况下全部完成时间为空)

业务背景:

商家在开通店铺服务的时候,会由商家服务人员去跟进商家完成开店任务,如:创建店铺(task_id=1),完成交易(task_id=2),创建营销活动(task_id=3),那么在考核服务人员是否做好服务的定义是:商家在一个月内是否完成所有开店的任务,因此需要统计商家完成全部任务的最早时间,以判断服务的好坏。

原始数据:

原始数据:
table:test
shop_id    |task_id    |finish_time
001        |1          |2020-03-01 09:00:00
001        |1          |2020-04-01 09:00:00
001        |2          |2020-03-12 09:00:00
001        |3          |2020-03-10 09:00:00
001        |3          |2020-03-02 09:00:00
002        |1          |2020-04-01 09:00:00输出结果:
shop_id    |finish_time
001        |2020-03-12 09:00:00
002        |

分析:
1、每个店铺都会有3个流程,只有流程走完才会有最早完成时间。

2、每个流程都会有多次的完成时间,同一个店铺同一个流程要取最早的完成时间。

3、不同流程完成时间中取最早的完成时间为这个店铺的最后的最早完成时间。

解决方案:
1、先按照shopid,task_id作为主键来获取每个店铺、每个任务节点的最早完成时间,那么得出结果如下:

shop_id    |task_id    |finish_time
001        |1          |2020-03-01 09:00:00
001        |2          |2020-03-12 09:00:00
001        |3          |2020-03-02 09:00:00
002        |1          |2020-04-01 09:00:00

2、然后按照shopid做为主键,对task_id进行聚合统计,对finish_time进行排序获取最新的时间,得出结果如下:

shop_id    |task_num    |finish_time
001        |3          |2020-03-12 09:00:00
002        |1          |2020-04-01 09:00:00

3、判断task_num个数是否为3,如果为3,那么店铺完成所有的业务,就输出这一行,如果不为3,那么未完成所有业务,finish_time变为null,结果如下:

shop_id    |task_num    |finish_time
001        |3          |2020-03-12 09:00:00
002        |1          |

Spark的处理逻辑:

  val DF = Spark.sql("select shop_id,task_id,unix_timestamp(finish_time) as ft from test")val RDD = DF.rdd.map(f => ((f.getAs[String]("shop_id"),f.getAs[Int]("task_id")),f.getAs[Long]("ft"))).groupByKey().map(f => {val shop_id = f._1._1val task_id = f._1._2val list = f._2.toList.sortWith(_ < _)(shop_id,task_id,list.head)}).map(f => (f._1,(f._2,f._3))).groupByKey().map(f => {val shop_id = f._1val list = f._2.toList.sortWith(_._2 > _._2)if (list.length == 3){(shop_id,list.length,list.head._2)}else(shop_id,list.length,0L)})

2、取登陆用户的最大连续登陆天数。

业务场景:

某C端APP,每天会记录登陆用户的登陆时间,然后需要统计用户在一段周期内的最长连续登陆的天数/或者没有登陆的天数。

同时这个业务场景在监控里面也可以使用:例如取数据表中最近连续稳定(数据量不变)的天数等等。

原始数据:

user_id    |ds
001        |2020-03-01
001        |2020-03-02
001        |2020-03-03
001        |2020-03-04
001        |2020-03-06
001        |2020-03-07
002        |2020-03-01
002        |2020-03-04
001        |2020-03-05

结果:

user_id    |num
001        |4
002        |2

分析:

这块主要处理的问题是连续登陆的问题,如何取判断用户是连续登陆。

1、对用户的登陆时间进行排序;

2、计算每两个时间的时间差,如果对应的时间差为1天,那么就是连续登陆,如果大于1,则为非连续;

3、统计时间差对应数组中连续为1的最大长度就是最大的连续登陆天数。

Spark的处理逻辑:

val DF = Spark.sql("select uid,unix_timestamp(ds) as dt from test")val RDD = DF.rdd.map(f => (f.getAs[String]("uid"),f.getAs[Long]("dt"))).groupByKey().map(f => {val uid = f._1val ir = f._2.toBuffer.sortWith(_ < _)var array: Array[Long] = Array()var num = 0Lfor (i <- 0 to ir.length - 2) {val subTime = ir(i + 1) - ir(i)val during = subTime/86400Lif (during == 1L){num = num+1Larray = array :+ num}else{num = 0L}}(uid,array.max)})

原理:

例如:
array如下(也就是时间差对应的数组):
Array[Long] = Array(1, 2, 1, 1, 1, 2, 1, 1, 1, 1)var num = 0Lvar arr: Array[Long] = Array()for (i <- 0 to array.length - 1) {if (array(i) == 1L){num = num + 1Larr = arr :+ num}else{num = 0L}}
输出:
arr:
Array[Long] = Array(1, 1, 2, 3, 1, 2, 3, 4)
而arr.max = 4 也就是最大连续登陆的天数。

3、如何让业务方能够自由筛选当天分钟级别的新增访问用户数。

业务背景:

在做flink的实时大屏统计的时候,只能选在到当天当前这个时刻的新增用户数有多少,但是业务方需要通过时间筛选,可能在8点30的时候,需要去看到8点25的时候,今天新增了多少访问用户,而且这个时间区间是随机的,而且是到分钟维度的。

分析:
如果数据量小的情况下:

通过canal监听业务库的binlog,然后写到Kafka通过flink进行binlog解析,生成用户的第一次登陆时间写到mysql,供后端同学通过业务逻辑进行筛选,就可以达到任意区间,任意范围的新增访问用户的圈选。

但是在C端数据量偏大的情况下,显然不能存储全量数据,就算存储也不能按照hive的方式存储,uid + fisrt_time这种模式进行存储。

那么数据量大的情况下,如何解决呢:

1、可以按照分钟进行存储,数据的主键就是时间戳到分钟级别的,然后统计每分钟第一次访问的用户量,那么一天的数据也就是1440行,每一行存的就是第一次访问时间在这个分钟内的用户量。

time_min    |num
2020-08-18 09:01:00        |4002
2020-08-18 09:02:00        |5002
2020-08-18 09:03:00        |5202

这样存储之后后端可以通过时间区间进行筛选后相加得到某个分钟级别区间端的第一次访问的用户数据。

2、不过上面的方案有个缺陷,虽然将用户维度为主键修改为分钟维度的主键,数据量减少了很多,但是可能业务方需要的不仅仅是用户量,还要具体的用户ID,来针对性进行投放,那么上面的方案就不太适合了。

针对上面的业务场景,可以选用Hbase进行优化。

rowKey    |uid
2020-08-18 09:01:00+timetamp        |{uid1,uid2,uid3}
2020-08-18 09:02:00+timetamp        |{uid21,uid23,uid33}
2020-08-18 09:03:00+timetamp        |{uid13,uid24,uid35}

由于Hbase本身是列存储的,如果将分钟级别的时间戳作为RowKey,是可以很快的定位到数据所在的位置,不必进行全表扫描,这样查询效率会很快。

不过这个场景没有验证过,但是在用户画像的需求中是通过这个逻辑来实现秒级别的查询的。

高威:浅谈Hbase在用户画像上的应用​zhuanlan.zhihu.com

4、递归的方式来解析JSON串(树结构)

业务背景:

在处理IM需求的时候,需要对客服的评价进行打分,而客服的评价系统是分为多个层级,不同类型,当初设计这个层级关系的时候是按照树结构进行涉及的,最多能下层4集合,但是每一层的都会有具体行为的选择和对应的得分情况。某一个层级可以包含多个下属层级。

具体结构如下:

层级架构如下:

分析:
1、本身是一个数组,数组的元素是JSON串,基本字段一致,每一层级都是包含基本字符串信息:level,id,lbalel,value,parentID,children。

2、children的value也是一个数组,和上面的数组模式一样同时包含全部字段。

3、最后的层级最多到第四层结束,或者说是判断最后对应的children的值是一个空数组结束。

所以这个模式可以利用递归进行调用解析,最后的判定条件是children的值是否为空为止。

代码模式:

var res = new ArrayBuffer[(String,String,String,String,String)]()def JsonFunc(Son: String,rest:ArrayBuffer[(String,String,String,String,String)]): ArrayBuffer[(String,String,String,String,String)] = {val jsonArray = JSON.parseArray(Son)if (jsonArray.size() == 0)  {println("last")}else{for (i <- 0 to (jsonArray.size() - 1)) {val jsonObj = jsonArray.getJSONObject(i)val label = jsonObj.getOrDefault("label", null).toStringval value = jsonObj.getOrDefault("value", null).toStringval id = jsonObj.getOrDefault("id", null).toStringval level = jsonObj.getOrDefault("level", null).toStringval parentID = jsonObj.getOrDefault("parentID", null).toStringres =  res :+ (id,level,parentID,label,value)JsonFunc(jsonObj.getOrDefault("children", null).toString,res)}}res}

如何用递归处理一个数组中的数据成为一个树结构_Spark处理的一些业务场景(持续更新ing)...相关推荐

  1. js从一个数组中筛选出另一个数组中存在的值

    js从一个数组中筛选出另一个数组中存在的值 这里从arr中筛选arr1中存在的值,arr2为筛选结果数组 let arr=["1","2","3&qu ...

  2. vue+js 从一个数组中删除在另一个数组中已存在对象;

    数组,对象常用的删除方法: 1.根据一个数组元素,删除另一个数组中的对象: var a = [{ id: 15 }, { id: -1 }, { id: 0 }, { id: 3 }, { id: 1 ...

  3. 从一个数组中找出 N 个数,其和为 M 的所有可能--最 nice 的解法

    比起讨论已经存在的大牛,我们更希望有更多有潜力的前端小伙伴成为大牛,只有这样,前端在未来才能够持续不断的发光发热. 故事的背景 这是一个呆萌炫酷吊炸天的前端算法题,曾经乃至现在也是叱咤风云在各个面试场 ...

  4. c++如何输入数组_从一个数组中找出 N 个数,其和为 M 的所有可能最 nice 的解法...

    编者按:本文由前端狂想录公众号授权奇舞周刊转载. 故事的背景 这是一个呆萌炫酷吊炸天的前端算法题,曾经乃至现在也是叱咤风云在各个面试场景中. 可以这样说,有 90% 以上的前端工程师不会做这个题目. ...

  5. 快速找出一个数组中的两个数字,让这两个数字之和等于一个给定的值

    我觉得写得很清晰,希望没有侵犯作者的著作权,原文地址http://blog.csdn.net/hackbuteer1/article/details/6699642 快速找出一个数组中的两个数字,让这 ...

  6. python怎么读取csv的一部分数据_python批量读取csv文件 如何用python将csv文件中的数据读取成数组...

    如何用python把多个csv文件数据处理后汇总到新csv文件你看这月光多温柔,小编转头还能看见你,一切从未坍塌. 可以用pandas读取数据,首先把文件方同一个文件价里,然后对当前文件价的所有内容循 ...

  7. java数组输入一个实数_用java!!输入五个数,保存到一个数组中,然后将... C语言,编写一个程序,从键盘输入5个数,算出总和......

    导航:网站首页 > 用java!!输入五个数,保存到一个数组中,然后将... C语言,编写一个程序,从键盘输入5个数,算出总和... 用java!!输入五个数,保存到一个数组中,然后将... C ...

  8. 算法题:“找出单身狗”--找出一个数组中只出现一次的数字

    题目:一个数组中只有两个数字是出现一次,其他所有数字都出现了两次. 编写一个函数找出这两个只出现一次的数字. 解题过程以及思路:(思路在代码中以注释形式给出) //一个数组中只有两个数字是出现一次,其 ...

  9. 改进,从一个数组中找出 N 个数,其和为 M 的所有可能

    特此说明,本文算法改自于<从一个数组中找出 N 个数,其和为 M 的所有可能--最 nice 的解法>一文.本文不同的是,采用二进制正序表示法,这种实现思路更直观.更简单些. 问题 从一个 ...

最新文章

  1. Nat. Commun. | 条件GAN网络和基因表达特征用于类苗头化合物的发现
  2. [转]python的requests发送/上传多个文件
  3. Linux查看网卡状态
  4. 收集:搜罗或看到的搞笑桥段
  5. python利器怎么编程-Python实现翻译小工具!几行代码搞定!装逼利器有没有!
  6. 经典C语言程序100例之八五
  7. nginx 知识点 :ctx_index and index
  8. 气象背景场_“把脉”风雨 服务为民——甘肃研究型气象预报业务体系发展扫描...
  9. partition oracle用法,Oracle partition by 使用说明
  10. centos6.5装mysql好难_CentOS6.5 下MySQL傻瓜式安装
  11. linux命令之history命令
  12. 三星s3android wear,三星galaxy wearable下载
  13. Error: 未绑定为第三方平台的开发小程序
  14. java nio web,JavaWeb之三——网络IO和NIO
  15. 史陶比尔staubli机器人手柄控制器维修操作屏修理
  16. Java||求集合数组中的中位数
  17. arduion-step motor 28byj-48步进电机
  18. 三大世界级难题,等你来解答
  19. 关于asc、txt格式到pcd、ply格式数据转换
  20. 化妆行业网站建设方案

热门文章

  1. OpenCV中的reshape
  2. 问题十:【总结】解决了问题四~问题九,vec3这个类的代码应该都能看懂了
  3. 机器学习笔记-XGBoost
  4. 如何破解物联网卡带来的连接痛点
  5. python用于标识类方法的是_Python类的设计与使用
  6. 图像处理、分析与机器视觉(基于labview)_基于3D技术的机器视觉解决方案
  7. mysql 分表后如何扩展_MySQL横向扩展-分库分表解决方案总结
  8. oracle约束 1或0,Oracle笔记(十) 约束
  9. 计算机二级office试题27答案,2017年12月计算机二级MS Office习题答案(一)
  10. java第七章第九题_Java2程序设计基础第七章课后习题