Spark案例之流量统计(三种方法)
数据集
1,2020-02-18 14:20:30,2020-02-18 14:46:30,20
1,2020-02-18 14:47:20,2020-02-18 15:20:30,30
1,2020-02-18 15:37:23,2020-02-18 16:05:26,40
1,2020-02-18 16:06:27,2020-02-18 17:20:49,50
1,2020-02-18 17:21:50,2020-02-18 18:03:27,60
2,2020-02-18 14:18:24,2020-02-18 15:01:40,20
2,2020-02-18 15:20:49,2020-02-18 15:30:24,30
2,2020-02-18 16:01:23,2020-02-18 16:40:32,40
2,2020-02-18 16:44:56,2020-02-18 17:40:52,50
3,2020-02-18 14:39:58,2020-02-18 15:35:53,20
3,2020-02-18 15:36:39,2020-02-18 15:24:54,30
需求:统计每个用户各时间段(相隔小于10分钟的两端上网时间需要合并在一起)所用的总流量
方法1 使用RDD算子
import java.text.SimpleDateFormatimport org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SPARK_BRANCH, SparkConf, SparkContext}import scala.collection.mutable/*** @Auther Zhang* @Date 2020/8/14*/
object FloatCount2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(this.getClass.getCanonicalName).setMaster("local[*]")val sc = new SparkContext(conf)val rdd = sc.textFile("data/data.csv")//处理数据val splited: RDD[((String, Long, Long, Long), Null)] = rdd.mapPartitions(it => {//因为要处理日期格式的数据,所以需要new SimpleDateFormatval sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")it.map(line => {val sp = line.split(",")val uid = sp(0)val startTime = sp(1)val endTime = sp(2)val flow = sp(3).toLong//将字符串格式的日期转换为Date格式的日期val startTimeStamp = sdf.parse(startTime).getTimeval endTimeStamp = sdf.parse(endTime).getTime((uid, startTimeStamp, endTimeStamp, flow), null)})})//计算用户id,返回一个数组val uids: Array[String] = splited.map(_._1._1).distinct().collect()implicit val sorter = Ordering[Long].on[(String, Long, Long, Long)](t => t._2)//搞一个自定义分区器,按uid分区//repartitionAndSortWithPartitions 不仅可以自定义分区,还能排序val partitioned: RDD[((String, Long, Long, Long), Null)] = splited.repartitionAndSortWithinPartitions(new UidAndPartitioner(uids))val grouped: RDD[((String, Int), Iterable[(Long, Long, Long)])] = partitioned.mapPartitions(it => {//这里要计算两端时间是否是在相隔没有超过10分钟,// 其实就是要用下一条数据的startTime 减去上一条的endTime,经过转换后看是否大于10分钟// 要处理第一条时间,同时也需要个中间值(用来存放每次上一条的endTime),所以需要定义一个temp(long类型)//flag是用来标记是否大于10分钟的,若大于10分钟,标记1,没有则是0//最后将flag累计,前面所有例至当前列这样的加,会得到不同的结果,结果相同的就是可以划定为一个时间段(没超过10分钟)的多条数据var temp = 0Lvar flag = 0it.map(x => {//获取起始时间val startTimeStamp = x._1._2//获取结束时间val endTimeStamp = x._1._3if (temp != 0) {if ((startTimeStamp - temp) / 1000 / 60 > 10) {flag += 1} else {flag += 0}}//当temp == 0时,上面的if语句没有被执行,也就是处理第一条数据时,直接将第一条数据的endtime赋值给了temp//然后处理第二条数据时,判断temp!=0的条件成立,所以用startTime-temp,也就是用第二条数据的startTime-第一条的endTime,完美//处理完第二条后,temp又重新赋值,将第二条的endTime赋值给自己,不停的循环往复temp = endTimeStamp((x._1._1, flag), (x._1._2, x._1._3, x._1._4))})}).groupByKey()//处理最后结果,这里用了mapValuesval result: RDD[(String, Long, Long, Long)] = grouped.mapValues(it => {//先将迭代器转换为List,然后按照时间排序val tuples: List[(Long, Long, Long)] = it.toList.sortBy(_._1)//取出开始的时间val startTimeStamp = tuples.head._1//取出结束的时间val endTimeStamp = tuples.last._2//求出这段时间的所耗费的总流量val totalflow: Long = tuples.map(x => {x._3}).reduce(_ + _)(startTimeStamp, endTimeStamp, totalflow)}).map(x => {//重新map,不要flag那个标记,只保留uid,startTimeStamp,endTimeStamp,totalFlow(x._1._1, x._2._1, x._2._2, x._2._3)})val buffer = result.collect().toBufferprintln(buffer)}}class UidAndPartitioner(uids: Array[String]) extends Partitioner {//在主构造器中定义分区规则(一个用户id一个分区)private val uidAndNum = new mutable.HashMap[String, Int]()var index = 0for (elem <- uids) {uidAndNum(elem) = indexindex += 1}//分区的个数override def numPartitions: Int = uids.length//获取分区id的方法override def getPartition(key: Any): Int = {val uid = key.asInstanceOf[(String, Long, Long, Long)]._1uidAndNum(uid)}
}
上述代码可以优化,就是在处理最后结果时,可以这么操作
val res = uidAndFlag.reduceByKey((t1, t2) => {(Math.min(t1._1, t2._1), Math.max(t1._2, t2._2), t1._3 + t2._3)}).mapPartitions(it => {val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")it.map(t => {(t._1._1, dateFormat.format(new Date(t._2._1)), dateFormat.format(new Date(t._2._2)), t._2._3)})}).collect()println(res.toBuffer)
使用reduceByKey
算子提高效率,reduceByKey会 先局部使用传入的函数逻辑,再全局使用传入的逻辑,这里就是先局部先两两比较,判断哪个startTime最早Math.min(t1._1, t2._1
,判断哪个endTime最晚Math.max(t1._2, t2._2)
,局部统计耗费的流量t1._3 + t2._3
,然后再全局统计。
小结:
1.案例涉及到对时间格式数据的处理,需要SimpleDateFormat类帮助,比如字符串格式的时间和Date格式的时间相互转换以便计算
2.案例最难的部分就是判断两条数据是否间隔超过10分钟,这里的逻辑最难写(和判断连续n天做xxx有异曲同工之妙,连续n天的案例需要按照用户分组,并按照时间排序, 然后依次打上一个number做标记, 用时间减去这个number标记得到一个新的时间, 时间相同的就是连续的),这里的案例是运用了flag做标记,超过10分钟就标记1,没有就标记0,然后起始行到当前行的累加,得到结果相同的就是要划分到同一个时间段的数据。
方法2 使用SparkSql
import org.apache.spark.sql.{DataFrame, SparkSession}/*** @Auther Zhang* @Date 2020/8/19*/
object SQLflow {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getCanonicalName).master("local[*]").getOrCreate()//spark读取csv数据val df: DataFrame = spark.read.option("inferSchema",true).csv("data/data.csv").toDF("uid","start_time","end_time","flow")df.createTempView("v_uid_flow")spark.sql("""SELECT| uid,| MIN(start_time) start_time,| MAX(end_time) end_time,| SUM(flow) total_flow|FROM| (SELECT| uid,| start_time,| end_time,| flow,| SUM(flag) OVER(PARTITION BY uid ORDER BY start_time) cn| FROM| (SELECT| uid,| start_time,| end_time,| flow,| IF((UNIX_TIMESTAMP(start_time)- UNIX_TIMESTAMP(lag_time)) / 60 > 10 ,1,0) flag| FROM| (SELECT| uid,| start_time,| end_time,| flow,| LAG(end_time,1,start_time) OVER(PARTITION BY uid ORDER BY start_time) lag_time| FROM| v_uid_flow) v1)v2)v3|GROUP BY uid,cn|""".stripMargin).show()}}
方法3 使用DSL
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, SparkSession}/*** @Auther Zhang* @Date 2020/8/19*/
object DSLFlow {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getCanonicalName).master("local[*]").getOrCreate()//spark读取csv数据val df: DataFrame = spark.read.option("inferSchema", true).csv("data/data.csv").toDF("uid", "start_time", "end_time", "flow")import spark.implicits._import org.apache.spark.sql.functions._df.select($"uid",$"start_time",$"end_time",$"flow",expr("lag(end_time,1,start_time)") over(Window.partitionBy($"uid").orderBy($"start_time")) as "lag_time").select($"uid",$"start_time",$"end_time",$"flow",expr("IF((UNIX_TIMESTAMP(start_time)- UNIX_TIMESTAMP(lag_time)) / 60 > 10 ,1,0) flag")).select($"uid",$"start_time",$"end_time",$"flow",sum($"flag") over(Window.partitionBy($"uid").orderBy($"start_time")) as "fid").groupBy($"uid",$"fid").agg(min($"start_time") as "start_time",max($"end_time") as "end_time",sum($"flow") as "total_flow").drop($"fid").show()}}
Spark案例之流量统计(三种方法)相关推荐
- 统计内表行数常用的三种方法
以下是统计内表行数常用的三种方法, 如下: 1.LOOP AT it_itab. g_lines2 = g_lines2 + 1. ENDLOOP. 该方法是通过循环内部表自己累 ...
- Spark创建DataFrame的三种方法
跟关系数据库的表(Table)一样,DataFrame是Spark中对带模式(schema)行列数据的抽象.DateFrame广泛应用于使用SQL处理大数据的各种场景.创建DataFrame有很多种方 ...
- 流量节省模式 Android,这三种方法让你节省更多手机上网流量
大多数手机用户应该都知道最基本的流量管理方式--没有Wi-Fi可用时就不看视频.不玩在线游戏.或者下载大型应用--除非你拥有不限流量套餐.但如果你在使用流量时已经很节约,但每个月的用量依然会超出流量套 ...
- shell统计每一行字符数的三种方法
shell统计每一行字符数的三种方法 $cat test.txt SlNAC2 ,SlNAC2_2 ,SlNAC2_2 ##### 方法一: $awk -F "" '{print ...
- SpringBoot三种方法实现定时发送邮件的案例
介绍 这里是小编成长之路的历程,也是小编的学习之路.希望和各位大佬们一起成长! 以下为小编最喜欢的两句话: 要有最朴素的生活和最遥远的梦想,即使明天天寒地冻,山高水远,路远马亡. 一个人为什么要努力? ...
- 【小案例】字符串转换为数字数据类型的三种方法
<script>var string="88.96"; var num=string*10; console.log(string,num,typeof num);&l ...
- 数据分析常用三种方法
数据分析常用三种方法:趋势分析.对比分析.细分分析 1. 趋势分析 趋势分析般而言,适用于产品核心指标的长期跟踪,比如,点击率,GMV,活跃用户数等.做出简单的数据趋势图,并不算是趋势分析,趋势分析更 ...
- 了解宇宙万物的“第三种方法”,人工智能正在改变科学
来源:quantamagazine 编辑:张卓骏.luciana.笪洁琼.Aileen 当今物理和天文实验所产生的海量信息,没有任何一个人或者团队可以完整的处理. 有些实验数据每天以千兆字节的规模在增 ...
- 独家 | 使EfficientNet更有效率的三种方法(附链接)
作者:Dominic Masters翻译:王可汗校对:欧阳锦本文约3300字,建议阅读5分钟本文为大家介绍了提升EffcientNet效率和性能的三个策略. 在实践中有更好性能的EfficientNe ...
- 猫脸关键点检测大赛:三种方法,轻松实现猫脸识别!
导语:挑战猫脸,就差你了! 今天这个比赛,得从一个做程序猿的铲屎官开始说起...... 话说,有一天「铲屎猿」早起之后,发现猫主子竟然没了身影:他找啊找啊,找了好久,可仍然到处都没找到猫主子.这时,客 ...
最新文章
- 002_生活口语积累
- 解决Unity中新导入了C#文件在vs打开一直都是杂项文件的问题
- js base64编码解码 btoa atob 函数简介
- 论java中可变参数
- toast, 警告窗
- bzoj 1702: [Usaco2007 Mar]Gold Balanced Lineup 平衡的队列
- HDU3501——欧拉函数裸题
- C#网络编程之面向连接的套接字
- 外螺纹对照表_最新英制螺纹对照表(2016年完整版).
- [04-07]最新精选绿色软件每日更新(小熊整理)
- WEB 系统架构演变
- 分位数回归(quantile regression)R实现
- 电脑使用小常识(4):让win10强制更新棍淡
- ASP.NET WEBAPI实现微信接入验证
- Java简单知识点小结
- H5页面展示丨网页三维展示丨产品3D展示原理【商迪3D】
- 惠普打印机墨盒更换教程_惠普打印机加墨教程:老司机教你
- 负数二进制表示的方式及原因
- python 携程_python-携程爬虫
- 即日起,申请美国签证需要提供社交媒体账户