数据集

1,2020-02-18 14:20:30,2020-02-18 14:46:30,20
1,2020-02-18 14:47:20,2020-02-18 15:20:30,30
1,2020-02-18 15:37:23,2020-02-18 16:05:26,40
1,2020-02-18 16:06:27,2020-02-18 17:20:49,50
1,2020-02-18 17:21:50,2020-02-18 18:03:27,60
2,2020-02-18 14:18:24,2020-02-18 15:01:40,20
2,2020-02-18 15:20:49,2020-02-18 15:30:24,30
2,2020-02-18 16:01:23,2020-02-18 16:40:32,40
2,2020-02-18 16:44:56,2020-02-18 17:40:52,50
3,2020-02-18 14:39:58,2020-02-18 15:35:53,20
3,2020-02-18 15:36:39,2020-02-18 15:24:54,30

需求:统计每个用户各时间段(相隔小于10分钟的两端上网时间需要合并在一起)所用的总流量

方法1 使用RDD算子


import java.text.SimpleDateFormatimport org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SPARK_BRANCH, SparkConf, SparkContext}import scala.collection.mutable/*** @Auther Zhang* @Date 2020/8/14*/
object FloatCount2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(this.getClass.getCanonicalName).setMaster("local[*]")val sc = new SparkContext(conf)val rdd = sc.textFile("data/data.csv")//处理数据val splited: RDD[((String, Long, Long, Long), Null)] = rdd.mapPartitions(it => {//因为要处理日期格式的数据,所以需要new SimpleDateFormatval sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")it.map(line => {val sp = line.split(",")val uid = sp(0)val startTime = sp(1)val endTime = sp(2)val flow = sp(3).toLong//将字符串格式的日期转换为Date格式的日期val startTimeStamp = sdf.parse(startTime).getTimeval endTimeStamp = sdf.parse(endTime).getTime((uid, startTimeStamp, endTimeStamp, flow), null)})})//计算用户id,返回一个数组val uids: Array[String] = splited.map(_._1._1).distinct().collect()implicit val sorter = Ordering[Long].on[(String, Long, Long, Long)](t => t._2)//搞一个自定义分区器,按uid分区//repartitionAndSortWithPartitions 不仅可以自定义分区,还能排序val partitioned: RDD[((String, Long, Long, Long), Null)] = splited.repartitionAndSortWithinPartitions(new UidAndPartitioner(uids))val grouped: RDD[((String, Int), Iterable[(Long, Long, Long)])] = partitioned.mapPartitions(it => {//这里要计算两端时间是否是在相隔没有超过10分钟,// 其实就是要用下一条数据的startTime 减去上一条的endTime,经过转换后看是否大于10分钟// 要处理第一条时间,同时也需要个中间值(用来存放每次上一条的endTime),所以需要定义一个temp(long类型)//flag是用来标记是否大于10分钟的,若大于10分钟,标记1,没有则是0//最后将flag累计,前面所有例至当前列这样的加,会得到不同的结果,结果相同的就是可以划定为一个时间段(没超过10分钟)的多条数据var temp = 0Lvar flag = 0it.map(x => {//获取起始时间val startTimeStamp = x._1._2//获取结束时间val endTimeStamp = x._1._3if (temp != 0) {if ((startTimeStamp - temp) / 1000 / 60 > 10) {flag += 1} else {flag += 0}}//当temp == 0时,上面的if语句没有被执行,也就是处理第一条数据时,直接将第一条数据的endtime赋值给了temp//然后处理第二条数据时,判断temp!=0的条件成立,所以用startTime-temp,也就是用第二条数据的startTime-第一条的endTime,完美//处理完第二条后,temp又重新赋值,将第二条的endTime赋值给自己,不停的循环往复temp = endTimeStamp((x._1._1, flag), (x._1._2, x._1._3, x._1._4))})}).groupByKey()//处理最后结果,这里用了mapValuesval result: RDD[(String, Long, Long, Long)] = grouped.mapValues(it => {//先将迭代器转换为List,然后按照时间排序val tuples: List[(Long, Long, Long)] = it.toList.sortBy(_._1)//取出开始的时间val startTimeStamp = tuples.head._1//取出结束的时间val endTimeStamp = tuples.last._2//求出这段时间的所耗费的总流量val totalflow: Long = tuples.map(x => {x._3}).reduce(_ + _)(startTimeStamp, endTimeStamp, totalflow)}).map(x => {//重新map,不要flag那个标记,只保留uid,startTimeStamp,endTimeStamp,totalFlow(x._1._1, x._2._1, x._2._2, x._2._3)})val buffer = result.collect().toBufferprintln(buffer)}}class UidAndPartitioner(uids: Array[String]) extends Partitioner {//在主构造器中定义分区规则(一个用户id一个分区)private val uidAndNum = new mutable.HashMap[String, Int]()var index = 0for (elem <- uids) {uidAndNum(elem) = indexindex += 1}//分区的个数override def numPartitions: Int = uids.length//获取分区id的方法override def getPartition(key: Any): Int = {val uid = key.asInstanceOf[(String, Long, Long, Long)]._1uidAndNum(uid)}
}

上述代码可以优化,就是在处理最后结果时,可以这么操作

 val res = uidAndFlag.reduceByKey((t1, t2) => {(Math.min(t1._1, t2._1), Math.max(t1._2, t2._2), t1._3 + t2._3)}).mapPartitions(it => {val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")it.map(t => {(t._1._1, dateFormat.format(new Date(t._2._1)), dateFormat.format(new Date(t._2._2)), t._2._3)})}).collect()println(res.toBuffer)

使用reduceByKey算子提高效率,reduceByKey会 先局部使用传入的函数逻辑,再全局使用传入的逻辑,这里就是先局部先两两比较,判断哪个startTime最早Math.min(t1._1, t2._1,判断哪个endTime最晚Math.max(t1._2, t2._2),局部统计耗费的流量t1._3 + t2._3,然后再全局统计。

小结:
1.案例涉及到对时间格式数据的处理,需要SimpleDateFormat类帮助,比如字符串格式的时间和Date格式的时间相互转换以便计算
2.案例最难的部分就是判断两条数据是否间隔超过10分钟,这里的逻辑最难写(和判断连续n天做xxx有异曲同工之妙,连续n天的案例需要按照用户分组,并按照时间排序, 然后依次打上一个number做标记, 用时间减去这个number标记得到一个新的时间, 时间相同的就是连续的),这里的案例是运用了flag做标记,超过10分钟就标记1,没有就标记0,然后起始行到当前行的累加,得到结果相同的就是要划分到同一个时间段的数据。

方法2 使用SparkSql

import org.apache.spark.sql.{DataFrame, SparkSession}/*** @Auther Zhang* @Date 2020/8/19*/
object SQLflow {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getCanonicalName).master("local[*]").getOrCreate()//spark读取csv数据val df: DataFrame = spark.read.option("inferSchema",true).csv("data/data.csv").toDF("uid","start_time","end_time","flow")df.createTempView("v_uid_flow")spark.sql("""SELECT| uid,| MIN(start_time) start_time,| MAX(end_time) end_time,| SUM(flow) total_flow|FROM| (SELECT|  uid,|  start_time,|  end_time,|  flow,|  SUM(flag) OVER(PARTITION BY uid ORDER BY start_time) cn| FROM|  (SELECT|   uid,|   start_time,|   end_time,|   flow,|   IF((UNIX_TIMESTAMP(start_time)- UNIX_TIMESTAMP(lag_time)) / 60 > 10 ,1,0) flag|  FROM|   (SELECT|    uid,|    start_time,|    end_time,|    flow,|    LAG(end_time,1,start_time) OVER(PARTITION BY uid ORDER BY start_time) lag_time|   FROM|    v_uid_flow) v1)v2)v3|GROUP BY uid,cn|""".stripMargin).show()}}

方法3 使用DSL


import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, SparkSession}/*** @Auther Zhang* @Date 2020/8/19*/
object DSLFlow {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getCanonicalName).master("local[*]").getOrCreate()//spark读取csv数据val df: DataFrame = spark.read.option("inferSchema", true).csv("data/data.csv").toDF("uid", "start_time", "end_time", "flow")import spark.implicits._import org.apache.spark.sql.functions._df.select($"uid",$"start_time",$"end_time",$"flow",expr("lag(end_time,1,start_time)") over(Window.partitionBy($"uid").orderBy($"start_time")) as "lag_time").select($"uid",$"start_time",$"end_time",$"flow",expr("IF((UNIX_TIMESTAMP(start_time)- UNIX_TIMESTAMP(lag_time)) / 60 > 10 ,1,0) flag")).select($"uid",$"start_time",$"end_time",$"flow",sum($"flag") over(Window.partitionBy($"uid").orderBy($"start_time")) as "fid").groupBy($"uid",$"fid").agg(min($"start_time") as "start_time",max($"end_time") as "end_time",sum($"flow") as "total_flow").drop($"fid").show()}}

Spark案例之流量统计(三种方法)相关推荐

  1. 统计内表行数常用的三种方法

    以下是统计内表行数常用的三种方法, 如下:   1.LOOP AT it_itab.    g_lines2 = g_lines2 + 1.    ENDLOOP.    该方法是通过循环内部表自己累 ...

  2. Spark创建DataFrame的三种方法

    跟关系数据库的表(Table)一样,DataFrame是Spark中对带模式(schema)行列数据的抽象.DateFrame广泛应用于使用SQL处理大数据的各种场景.创建DataFrame有很多种方 ...

  3. 流量节省模式 Android,这三种方法让你节省更多手机上网流量

    大多数手机用户应该都知道最基本的流量管理方式--没有Wi-Fi可用时就不看视频.不玩在线游戏.或者下载大型应用--除非你拥有不限流量套餐.但如果你在使用流量时已经很节约,但每个月的用量依然会超出流量套 ...

  4. shell统计每一行字符数的三种方法

    shell统计每一行字符数的三种方法 $cat test.txt SlNAC2 ,SlNAC2_2 ,SlNAC2_2 ##### 方法一: $awk -F "" '{print ...

  5. SpringBoot三种方法实现定时发送邮件的案例

    介绍 这里是小编成长之路的历程,也是小编的学习之路.希望和各位大佬们一起成长! 以下为小编最喜欢的两句话: 要有最朴素的生活和最遥远的梦想,即使明天天寒地冻,山高水远,路远马亡. 一个人为什么要努力? ...

  6. 【小案例】字符串转换为数字数据类型的三种方法

    <script>var string="88.96"; var num=string*10; console.log(string,num,typeof num);&l ...

  7. 数据分析常用三种方法

    数据分析常用三种方法:趋势分析.对比分析.细分分析 1. 趋势分析 趋势分析般而言,适用于产品核心指标的长期跟踪,比如,点击率,GMV,活跃用户数等.做出简单的数据趋势图,并不算是趋势分析,趋势分析更 ...

  8. 了解宇宙万物的“第三种方法”​,人工智能正在改变科学

    来源:quantamagazine 编辑:张卓骏.luciana.笪洁琼.Aileen 当今物理和天文实验所产生的海量信息,没有任何一个人或者团队可以完整的处理. 有些实验数据每天以千兆字节的规模在增 ...

  9. 独家 | 使EfficientNet更有效率的三种方法(附链接)

    作者:Dominic Masters翻译:王可汗校对:欧阳锦本文约3300字,建议阅读5分钟本文为大家介绍了提升EffcientNet效率和性能的三个策略. 在实践中有更好性能的EfficientNe ...

  10. 猫脸关键点检测大赛:三种方法,轻松实现猫脸识别!

    导语:挑战猫脸,就差你了! 今天这个比赛,得从一个做程序猿的铲屎官开始说起...... 话说,有一天「铲屎猿」早起之后,发现猫主子竟然没了身影:他找啊找啊,找了好久,可仍然到处都没找到猫主子.这时,客 ...

最新文章

  1. 002_生活口语积累
  2. 解决Unity中新导入了C#文件在vs打开一直都是杂项文件的问题
  3. js base64编码解码 btoa atob 函数简介
  4. 论java中可变参数
  5. toast, 警告窗
  6. bzoj 1702: [Usaco2007 Mar]Gold Balanced Lineup 平衡的队列
  7. HDU3501——欧拉函数裸题
  8. C#网络编程之面向连接的套接字
  9. 外螺纹对照表_最新英制螺纹对照表(2016年完整版).
  10. [04-07]最新精选绿色软件每日更新(小熊整理)
  11. WEB 系统架构演变
  12. 分位数回归(quantile regression)R实现
  13. 电脑使用小常识(4):让win10强制更新棍淡
  14. ASP.NET WEBAPI实现微信接入验证
  15. Java简单知识点小结
  16. H5页面展示丨网页三维展示丨产品3D展示原理【商迪3D】
  17. 惠普打印机墨盒更换教程_惠普打印机加墨教程:老司机教你
  18. 负数二进制表示的方式及原因
  19. python 携程_python-携程爬虫
  20. 即日起,申请美国签证需要提供社交媒体账户

热门文章

  1. sd卡计算机无法读取数据,电脑突然无法读取sd的起因分析sd卡无法读取怎么修复...
  2. 2020计算机专业保研夏令营面经:中科院计算所网数机试题目
  3. 以太坊基础---奖励
  4. 安装office2013报安装程序找不到OneNote.zh-cn、安装源不存在
  5. 团队任务3:第一次冲刺(Alpha)
  6. 在Java中为JFrame添加背景音乐
  7. 中国诗词大会第四季第一场
  8. python爬虫总结,看这篇就够了
  9. 介绍一个使用go写的TUI性能监测工具gotop
  10. 解读arduino读取模拟信号实例