文章目录

  • 用例1:数据清洗
  • 用例2:用户留存分析
  • 用例3:活跃用户分析
  • 用例4:活跃用户地域信息分析
  • 用例5:用户浏览深度分析

本项目用到的文件获取如下,提取码: 6xdx
点我获取文件
注意:本文都是在spark-shell环境下完成

用例1:数据清洗

读入日志文件并转化为RDD[Row]类型

  • 按照Tab切割数据
  • 过滤掉字段数量少于8个的
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, StringType}
val schemaString = "event_time url method status sip user_uip action_prepend action_client"
val fields = schemaString.split("\\s+").map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
val rdd = sc.textFile("file:///data/test.log").map(_.split("\t")).filter(_.size==8)
val rowRDD = rdd.map(x=>Row(x(0), x(1),x(2),x(3), x(4),x(5),x(6), x(7)))
val df = spark.createDataFrame(rowRDD,schema)

对数据进行清洗

日志拆分字段:
event_time
url
method
status
sip
user_uip
action_prepend
action_client
  • 按照第一列和第二列对数据进行去重
  • 过滤掉状态码非200
  • 过滤掉event_time为空的数据
  • 将url按照”&”以及”=”切割

保存数据: 将数据写入mysql表中

//按照第一列和第二列对数据进行去重 使用 df的dropDuplicates方法
//过滤掉状态码非200
//过滤掉event_time为空的数据
val ds = df.dropDuplicates("event_time","url").filter(x=>x(3)=="200").filter(x=>x(0).toString().trim !=null && x(0).toString().trim!="")
//将url 按照 ? 进行分割, 取出第二段并按照& 进行分割, 分割出每一组参数,最后按照= 切割, 变成键值对的形式转化成Map集合
val ds2 = ds.map(x=>{val s = x.getAs[String]("url").split("\\?")(1);val m= s.split("&").map(_.split("=")).filter(_.size==2).map(x=>(x(0),x(1))).toMap;(x.getAs[String]("event_time"),m.getOrElse("userUID", ""),m.getOrElse("userSID", ""),m.getOrElse("actionBegin", ""),m.getOrElse("actionEnd", ""),m.getOrElse("actionType", ""), m.getOrElse("actionName", ""),m.getOrElse("actionValue", ""), m.getOrElse("actionTest", ""),m.getOrElse("ifEquipment", ""),x.getAs[String]("method"),x.getAs[String]("status"),x.getAs[String]("sip"),x.getAs[String]("user_uip"),x.getAs[String]("action_prepend"),x.getAs[String]("action_client"))})
val df2 = ds2.toDF("event_time", "user_uid", "user_sid", "action_begin","action_end", "action_type", "action_name", "action_value","action_test", "if_equipment", "method", "status", "sip","user_uip", "action_prepend", "action_client")
//保存数据,将数据写入mysql表中
val url = "jdbc:mysql://localhost:3306/test"
val prop = new java.util.Properties
prop.setProperty("user", "root")
prop.setProperty("password", "sunyong")
prop.setProperty("driver","com.mysql.jdbc.Driver")
df2.write.mode("overwrite").jdbc(url,"logs",prop)

用例2:用户留存分析

计算用户的次日留存率

  • 求当天新增用户总数n
  • 求当天新增的用户ID与次日登录的用户ID的交集,得出新增用户次日登录总数m (次日留存数)
  • m/n*100%

计算用户的次周留存率

val logs = spark.read.jdbc(url,"logs2",prop)
logs.cache()
// 1.将action_name=='Registered'抽取为一张注册表
val registered = logs.filter($"action_name" === "Registered").withColumnRenamed("event_time","register_time").select("user_uid","register_time")
// 2.将action_name=='Signin'抽取为一张登陆表
val signin = logs.filter($"action_name" === "Signin").withColumnRenamed("event_time","signin_time").select("user_uid","signin_time")
// 3.将注册表和登陆表通过user_sid进行join操作
val joined = registered.join(signin, registered("user_uid") === signin("user_uid"), "left").drop(signin("user_uid"))
joined.createOrReplaceTempView("j")
// 将时间取前10位并转化为时间戳
val register2signin = spark.sql("select user_uid,register_time,signin_time,unix_timestamp(substr(register_time,1,10),'yyyy-MM-dd')registered_date ,unix_timestamp(substr(signin_time,1,10),'yyyy-MM-dd') signin_date from j")
// 4.若统计次日留存则过滤出来日期相差天数为1的, 若统计七日留存则过滤出来日期相差天数为7的
// 5.对日期进行分组操作,在进行累加求和即可得出次日留存和七日留存
// 次日留存
val day_retention = register2signin.filter($"registered_date" === $"signin_date" - 86400).groupBy($"registered_date",$"user_uid").count().groupBy("registered_date").count()
// 七日留存
val week_retenetion = register2signin.filter($"registered_date" === $"signin_date" - 604800).groupBy("registered_date","user_uid").count().groupBy("registered_date").count()
// 写入数据库
day_retention.write.mode("overwrite").jdbc(url, "day_retention", prop)//只有两天的数据 最后只有一条数据
week_retenetion.write.mode("overwrite").jdbc(url,"week_retention", prop)//只有两天数据,无法得到周留存,数据为空

用例3:活跃用户分析

统计分析需求

  • 读取数据库,统计每天的活跃用户数
  • 统计规则:有看课和买课行为的用户才属于活跃用户
  • 对UID进行去重
val logs = spark.read.jdbc(url,"logs2",prop)
// 1.将学习的和买课的日志过滤出来
// 2.将event_time 的字段的前十个字符取出来作为日期
// 3.按照用户id 去重
// 4.按照日期进行分组
// 5.统计人数
val activeUserCount= logs.filter($"action_name" === "StartLearn" || $"action_name" === "BuyCourse").map(x => {(x.getAs("user_uid").toString, x.getAs("event_time").toString.substring(0, 10))}).withColumnRenamed("_2", "date").distinct().groupBy("date").count().orderBy("date").cache()
// 写入mysql
activeUserCount.write.mode("overwrite").jdbc(url, "activeUserCount", prop)

用例4:活跃用户地域信息分析

统计分析需求

  • 读取原始日志数据
  • 解析url获取用户的访问IP
  • 通过IP库获得IP对应的省市区地址(读取文件转换后存入MySQL)
  • 求出每个地域人数的所占百分比
// 从mysql中读取数据
val url = "jdbc:mysql://localhost:3306/test"
val prop = new java.util.Properties
prop.setProperty("user", "root")
prop.setProperty("password", "sunyong")
prop.setProperty("driver","com.mysql.jdbc.Driver")
val logs = spark.read.jdbc(url, "logs2", prop).cache()
// 统计一下日志总条数
val cnt = logs.count()
// 注册udf 函数 1.ip 解析为整数形式   2.求占比
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._
val rate: UserDefinedFunction = udf((x: Double) => x / cnt)
val ip2Int: UserDefinedFunction = udf((x:String)=>{val y = x.split("\\.");y(0).toLong*256*256*256+y(1).toLong*256*256+y(2).toLong*256+y(3).toLong})
//使用纯真IP地址查询工具生成ip 地址文件
//将IP数据文件转换成整数形式存入MySQL
case class IP(startip:String,endip:String,city:String,company:String)
val df = sc.textFile("file:///data/IP.txt").map(_.split("\\s+")).filter(_.size==4).map(x=>IP(x(0),x(1),x(2),x(3))).toDF
//转换成整数形式存到数据库中
val ip = df.withColumn("startIp_Int",ip2Int($"startip")).withColumn("endIp_Int",ip2Int($"endip")).drop("startip").drop("endip")
ip.write.mode("overwrite").jdbc(url, "ip", prop)
//读取ip数据 创建临时表
val ipDF = spark.read.jdbc(url, "ip", prop).cache()
ipDF.createOrReplaceTempView("ip")
//对日志数据进行处理 并创建临时表
val regionIp = logs.select("user_uid","user_uip").filter(x => x.getAs("user_uip").toString.size>1).withColumn("ip",ip2Int($"user_uip")).drop("user_uip")regionIp.createOrReplaceTempView("region")// 将ip数据表与日志临时表进行join然后计算
val grouped = spark.sql("select * from ip i join region r on r.ip between i.startIp_Int and endIp_Int").select("user_uid","city").groupBy("city", "user_uid").count().dropDuplicates("user_uid").groupBy("city").count()//最终结果存入数据库
grouped.orderBy(grouped("count").desc).withColumn("rate", rate(grouped.col("count"))).write.mode("overwrite").jdbc(url, "region", prop)
  • 结果如下图

用例5:用户浏览深度分析

统计分析需求

  • 读取日志信息,以天为计量单位,通过depth值来表示用户的浏览深度
  • 统计每个depth阶段的用户的个数,反映出每个url的访问人数,针对性的优化页面,来提高网站的转化率,对用户产生粘性
  • 计算规则:当前url的个数作为depth的值
    1)一个用户今天浏览了三个页面
    2)一个url今天被50个人访问
val logs = spark.read.jdbc(url,"logs2",prop)
// 1.过滤出action_prepend 字段的长度大于10的
// 2.将event_time 这一列的时间转换为日期
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._
val time2date:UserDefinedFunction = udf((x:String)=>x.substring(0, 10))
val filtered = logs.filter(x => x.getAs("action_prepend").toString.length > 10).withColumn("date", time2date(logs.col("event_time"))).cache()
// 3.对"date", "user_sid", "action_prepend" 进行分组统计,之后再对"date", "count"进行分组统计.
// 4.统计url
// 统计用户浏览的url 的个数
val user_url = filtered.groupBy("date", "user_uid", "action_prepend").count().groupBy("date", "user_uid").count().orderBy("date", "count")
// 统计url被用户浏览次数
val url_count = filtered.groupBy("date", "action_prepend", "user_uid").count().groupBy("date", "action_prepend").count().orderBy("date", "count")
//写入数据库
user_url.write.mode("overwrite").jdbc(url, "user_url", prop)
url_count.write.mode("overwrite").jdbc(url, "url_count", prop)

Spark数据分析及处理相关推荐

  1. Spark数据分析实战:大型活动大规模人群的检测和疏散

    Spark数据分析实战:大型活动大规模人群的检测和疏散 2016-06-29 Hadoop技术博文 近日,风靡西雅图.旧金山的Datapalooza登陆上海(IBM Spark大赛启动 10万美元悬赏 ...

  2. 大数据hadoop,spark数据分析之 基于大数据平台的运营商在线服务系统设计

    今天向大家介绍一个帮助往届学生完成的毕业设计项目,大数据hadoop,spark数据分析之 基于大数据平台的运营商在线服务系统设计. 基于大数据平台的运营商在线服务系统设计 随着通信行业的业务拓展以及 ...

  3. 使用Relational Cache加速Spark数据分析

    本文转自云栖社区,作者:李呈祥(司麟) 文中的Spark为阿里云EMR产品的Spark,博主之前也考虑过类似的问题,受到了一些启发,所以转载分享一下. 背景 Cache被广泛应用于数据处理的各个领域和 ...

  4. Spark数据分析之第4课

    #音乐推荐和Audioscrobbler数据集 #1. 数据集 http://www-etud.iro.umontreal.ca/~bergstrj/audioscrobbler_data.html ...

  5. Spark数据分析之第5课

    对于http://blog.csdn.net/jiangshouzhuang/article/details/51550275中介绍的算法得出的推荐结果不怎么理想,下面进行进一步优化. #评价推荐质量 ...

  6. 基于电影爬虫及Spark数据分析可视化设计

    开发环境:PyCharm + Python3.7 + Spark + Idea + Mysql + Echart 由于我国经济的不断增长,人们的物质生活也在不断提升,因此越来越多的人观影需求增加以丰富 ...

  7. spark数据分析之ip归属地查询

    前一段时间,在项目中,领导要求实时查看来自各个省份的ip访问的详情,根据这一需求,通过flume/logstack实时采集nginx的日志到生产到kafka,再通过spark实时消费分析保存到redi ...

  8. Spark数据分析及处理_ELT

    使用Spark完成下列日志分析项目需求: 日志数据清洗 用户留存分析 活跃用户分析 import org.apache.commons.lang.StringUtils import org.apac ...

  9. ​终于有人把Spark大数据分析与挖掘讲明白了

    当我们每天面对扑面而来的海量数据时,是战斗还是退却,是去挖掘其中蕴含的无限资源,还是让它们自生自灭?我们的答案是:"一切都取决于你自己".对于海量而庞大的数据来说,在不同人眼里,既 ...

  10. Spark快速大数据分析——读书笔记

    --8.16开始整理 Spark快速大数据分析 推荐序: 一套大数据解决方案通常包含多个组件,从存储.计算和网络硬件层,到数据处理引擎,再到利用改良的统计和计算算法.数据可视化来获得商业洞见的分析层, ...

最新文章

  1. 如何在MFC中调用CUDA
  2. xxx cannot be resolved to a type 问题的几种常见原因
  3. imp导入时出现imp-00017 ora-06550的解决办法
  4. MatConvnet工具箱文档翻译理解(1)
  5. 从码奴到码神之路--初级到中级再到高级程序员的进化之路
  6. zoj3806Incircle and Circumcircle
  7. 微信小程序开发——点击按钮退出小程序的实现
  8. dategurd oracle_Oracle 时间和日期处理
  9. Grove-Lora Radio:修改库函数使能修改扩频因子、带宽参数、码率
  10. 试图将驱动程序添加到存储区_基于容器的块存储使用
  11. robocopy 备份_windows下使用RoboCopy命令进行文件夹增量备份
  12. python与施耐德plc通讯_施耐德PLC两种编程通讯控制实例分享
  13. 第七届DAMS中国数据智能管理峰会(上海站) - 文末俩惊喜
  14. 使用Python的basemap模块绘制地图的局部放大图(主图及放大的子图)
  15. 解决wps公式编辑器上移情况
  16. 一张图说明白数据安全管理体系的规划
  17. 关于动车:动车票假如象飞机票那样卖会如何?
  18. BC30 KiKi和酸奶
  19. App应用最有效的变现方式,还能同时提升留存!
  20. 62-Mybatis高级介绍

热门文章

  1. 计算机edp测试是什么测试,五、信息工具--(一)EDP:电子数据处理
  2. 2. evaluate-reverse-polish-notation
  3. 想了解Intel vPro(博锐)技术,就来Intel vPro虚拟展厅
  4. 区块链-网络安全的未来
  5. 电脑文件误删除怎么恢复?
  6. 用python实现监听微信撤回消息
  7. python进阶高级技能:Python退火算法在高次方程的应用
  8. html5表格数据加载,bootstrap table load加载数据到表格的方法
  9. (转载)C# Dictionary
  10. 关于win10开始菜单点击无反应解决方案