Spark DataFrameRDD案例实现
Spark DataFrame&RDD案例实现
- 1.DataFram类型
- 2.需求:将orders表中days_since_prior_order中的空置(" ")转换成0
- idea中快捷键
- 2.每个用户平均购买订单的间隔周期
- 3.每个用户的总订单数量(分组)
- 4.每个用户购买的product商品去重后的集合数据
- 4.每个用户总商品数量以及去重后的商品数量(distinct count)
- 需求:合并“去重后集合的数据”和“去重后的商品数量” 统计逻辑
- 5.每个用户购买的平均每个订单的商品数量(hive已经实现过了)
1.DataFram类型
针对NULL值可以转换成0
df.na.fill(0)
2.需求:将orders表中days_since_prior_order中的空置(" ")转换成0
val orders = spark.sql("select * from badou.orders")
val priors = spark.sql("select * from badou.priors")orders表
+--------+-------+--------+------------+---------+-----------------+----------------------+
|order_id|user_id|eval_set|order_number|order_dow|order_hour_of_day|days_since_prior_order|
+--------+-------+--------+------------+---------+-----------------+----------------------+
| 2539329| 1| prior| 1| 2| 08| |
| 2398795| 1| prior| 2| 3| 07| 15.0|
| 473747| 1| prior| 3| 3| 12| 21.0|
| 2254736| 1| prior| 4| 4| 07| 29.0|
| 431534| 1| prior| 5| 4| 15| 28.0|
+--------+-------+--------+------------+---------+-----------------+----------------------+
因为days_since_prior_order中有空值,所以要进行空值填充,可以利用selectExpr()
scala> orders.selectExpr("*","if(days_since_prior_order='',0.0 ,days_since_prior_order) as dspo").show(5)
+--------+-------+--------+------------+---------+-----------------+----------------------+----+
|order_id|user_id|eval_set|order_number|order_dow|order_hour_of_day|days_since_prior_order|dspo|
+--------+-------+--------+------------+---------+-----------------+----------------------+----+
| 2539329| 1| prior| 1| 2| 08| | 0.0|
| 2398795| 1| prior| 2| 3| 07| 15.0|15.0|
| 473747| 1| prior| 3| 3| 12| 21.0|21.0|
| 2254736| 1| prior| 4| 4| 07| 29.0|29.0|
| 431534| 1| prior| 5| 4| 15| 28.0|28.0|
+--------+-------+--------+------------+---------+-----------------+----------------------+----+
only showing top 5 rows
删除days_since_prior_order这列
scala> val orderNew = orders.selectExpr("*","if(days_since_prior_order='',0.0,days_since_prior_order) as dspo").drop("days_since_prior_order")
orderNew: org.apache.spark.sql.DataFrame = [order_id: string, user_id: string ... 5 more fields]scala> orderNew.show(5)
+--------+-------+--------+------------+---------+-----------------+----+
|order_id|user_id|eval_set|order_number|order_dow|order_hour_of_day|dspo|
+--------+-------+--------+------------+---------+-----------------+----+
| 2539329| 1| prior| 1| 2| 08| 0.0|
| 2398795| 1| prior| 2| 3| 07|15.0|
| 473747| 1| prior| 3| 3| 12|21.0|
| 2254736| 1| prior| 4| 4| 07|29.0|
| 431534| 1| prior| 5| 4| 15|28.0|
+--------+-------+--------+------------+---------+-----------------+----+
idea中快捷键
将多行合并成一场 ctrl shift + j
返回上一步操作 : ctrl + z
2.每个用户平均购买订单的间隔周期
注意:用户的第一个订单没有间隔天数的,需要赋值为0间隔天数在orders表中
scala> val userGap = orderNew.selectExpr("user_id","cast(dspo as int) as dspo").groupBy("user_id").avg("dspo")scala> userGap.show(5)+-------+------------------+
|user_id| avg(dspo)|
+-------+------------------+
| 296| 5.428571428571429|
| 467| 8.833333333333334|
| 675| 20.0|
| 691|13.173913043478262|
| 829| 9.25|
+-------+------------------+
only showing top 5 rows
将avg(dspo)进行重命名
scala> val userGap = orderNew.selectExpr("user_id","cast(dspo as int) as dspo").groupBy("user_id").avg("dspo").withColumnRenamed("avg(dspo)","u_avg_day_gap")
3.每个用户的总订单数量(分组)
scala> val userOrdCnt = orders.groupBy("user_id").count()scala> userOrdCnt.show(5)+-------+-----+
|user_id|count|
+-------+-----+
| 296| 7|
| 467| 6|
| 675| 11|
| 691| 23|
| 829| 4|
+-------+-----+
only showing top 5 rows
4.每个用户购买的product商品去重后的集合数据
结果: 1001 101200,120219,129101
将orders与priors表进行关联
问题:""代表String ’ '代表char类型
scala> val opDF = orders.join(priors,"order_id")
scala> val up = opDF.select("user_id","product_id")
将DataFrame 转变成RDD需要下面隐式转换包,利用隐式转换 因为后面Spark的算子操作如map()、groupBy()
DataFrame中没有mapValues()!!!!!!
注意:在使用RDD算子前进行(.rdd)转换就行
import spark.implicits._
注意:RDD类型没有show() 只有take()来查看数据
a、将product_id按照user_id进行分组
groupByKey() 按key进行分组 注意 键值对 => {key:values}
方式一:
scala> val rddGroup = up.rdd.map{x=>(x(0).toString,x(1).toString)}.groupByKey()
方式二:
scala> val rdGroup = up.map{ x=>(x(0).toString,x(1).toString)}.rdd.groupByKey()
结果:take(5) //从RDD中获取5个元素
scala> rddGroup.take(5):36 WARN executor.Executor: Managed memory leak detected; size = 5244782 bytes, TID = 415
res11: Array[(String, Iterable[String])] = Array((124168,CompactBuffer(20082, 20082, 14303, 11323, 46522, 11323, 20082, 20082, 22108, 22108))
, (120928,CompactBuffer(39275, 18897, 27845, 47209, 30440, 34448, 45948, 4605, 27966, 30252, 20995))
b、将商品进行去重
scala> mapValues(record=>record.toSet.mkString(",")) product商品去重(toSet)后的利用mkString(",")逗号进行拼接
var rddRecords = up.rdd.map{x=>(x(0).toString,x(1).toString)}.groupByKey().mapValues(record=>record.toSet.mkString(","))
结果
scala> rddRecords.take(5)
:18 WARN executor.Executor: Managed memory leak detected; size = 5244782 bytes, TID = 214
res9: Array[(String, String)] = Array((124168,20082,20082,14303,11323,46522,11323,20082,20082,22108,22108)
, (120928,39275,18897,27845,47209,30440,34448,45948,4605,27966,30252,20995,5194,13629,36695,25824,42265,39928,13870,41665,39561,4799)
d、RDD 转换成DF
toDF(“自定义名1”,“自定义名2”) 根据RDD已有的列数可以进行重命名
scala> rddRecords.toDF("user_id","prod_cnt").show(5)
结果
size = 5244782 bytes, TID = 423
+-------+--------------------+
|user_id| prod_cnt|
+-------+--------------------+
| 124168|20082,20082,14303...|
| 120928|39275,18897,27845...|
| 186692|11365,6184,14161,...|
| 43535|38185,33129,13187...|
| 8965|33198,25466,27966...|
+-------+--------------------+
4.每个用户总商品数量以及去重后的商品数量(distinct count)
a、user所有的商品数量(没有去重)
val userAllProd = up.groupBy("user_id").count()
scala> userAllProd.show(5)
+-------+-----+
|user_id|count|
+-------+-----+
| 88447| 185|
| 144913| 198|
| 145079| 612|
| 13282| 99|
| 124057| 80|
+-------+-----+
only showing top 5 rows
b、去重后的商品数量
val userUnOrdCnt = up.rdd.map{x=>(x(0).toString, x(1).toString)}.groupByKey().mapValues(_.toSet.size).toDF("user_id","prod_dis_cnt")
汇总关联:
scala> userAllProd.join(userUnOrdCnt,"user_id").select("*").show(10)
+-------+-----+------------+
|user_id|count|prod_dis_cnt|
+-------+-----+------------+
| 100010| 206| 119|
| 100140| 32| 28|
| 100227| 134| 70|
| 100263| 212| 38|
| 100320| 219| 121|
| 100553| 645| 243|
| 100704| 50| 40|
| 100735| 50| 42|
| 100768| 9| 8|
| 10096| 90| 47|
+-------+-----+------------+
only showing top 10 rows
//处理groupByKey的时候,看看能不能进行提前合并
需求:合并“去重后集合的数据”和“去重后的商品数量” 统计逻辑
方式一:合并 提取公因子,引入cache 提高效率
val userRddGroup = up.rdd.map{x=>(x(0).toString, x(1).toString)}.groupByKey().cache()userRddGroup.mapValues(_.toSet.mkString(",")).toDF("user_id","prod_records")
从缓存中移除 python del xxx
userRddGroup.unpersist()
方式二:同时计算两个
//返回tuple类型 (1.商品的大小(size),2.商品的List集合,利用逗号进行拼接)
mapValues{records=>
val rs = records.toSet; (rs.size , rs.mkString(","))}
val userProRcdSize = up.rdd.map{x=>(x(0).toString, x(1).toString)}.groupByKey().mapValues{records=>val rs = records.toSet;//返回tuple类型 (1.商品的大小(size),2.商品的List集合,利用逗号进行拼接)(rs.size , rs.mkString(","))}.toDF("user_id", "tuple").selectExpr("user_id","tuple._1 as prod_dist_size", "tuple._2 as prod_records")
方式三:使用自带的函数的处理
as(“af”) 重命名 可以使用 withColumnRenamed()替代但是位置不同
//导入sql中所有的包
import org.apache.spark.sql.functions._val usergroup = up.groupBy("user_id").agg(size(collect_set("product_id")).as("prod_dist_size"),collect_set("product_id").as("prod_records"))
结果:
+-------+--------------+--------------------+
|user_id|prod_dist_size| prod_records|
+-------+--------------+--------------------+
| 100010| 119|[7751, 27360, 247...|
| 100140| 28|[7021, 27845, 436...|
| 100227| 70|[24834, 45007, 27...|
| 100263| 38|[5157, 38928, 372...|
| 100320| 121|[39891, 34358, 17...|
| 100553| 243|[6873, 46075, 205...|
| 100704| 40|[14999, 3434, 308...|
| 100735| 42|[45368, 7430, 207...|
| 100768| 8|[40199, 18838, 49...|
| 10096| 47|[8021, 13629, 895...|
+-------+--------------+--------------------+
only showing top 10 rows
5.每个用户购买的平均每个订单的商品数量(hive已经实现过了)
我今天购买了2个order,一个是10个商品,另一个是4个商品
(10+4)一个订单对应多少个商品 / 2
结果:一个用户购买了几个商品=7
// 1) 每个订单有多少个商品
val ordProdCnt = priors.groupBy("order_id").count()// 2)求每个用户订单商品数量的平均值 user_id product_id
val userPerOrdProdCnt = orders.join(ordProdCnt, "order_id").groupBy("user_id").agg(avg("count").as("u_avg_ord_prods"))
结果:
scala> userPerOrdProdCnt.show(5)
+-------+---------------+
|user_id|u_avg_ord_prods|
+-------+---------------+
| 88447| 7.4|
| 144913| 5.5|
| 145079| 8.5|
| 13282| 6.1875|
| 124057| 20.0|
+-------+---------------+
only showing top 5 rows
Spark DataFrameRDD案例实现相关推荐
- 从原理到策略算法再到架构产品看推荐系统 | 附Spark实践案例
原文链接:mp.weixin.qq.com 作者 | HCY崇远 01 前言 本文源自于前阵子连续更新的推荐系统系列,前段时间给朋友整理一个关于推荐系统相关的知识教学体系,刚好自身业务中,预计明年初 ...
- Spark RDD案例(五)经纬度转换为地理位置
Spark RDD案例(五)经纬度转换为地理位置 1. 背景 Spark作为大数据分析引擎,本身可以做离线和准实时数据处理 Spark抽象出的操作对象如RDD.dataSet.dataFrame.DS ...
- 大数据Spark入门案例5–统计广告点击数量排行Top3(scala版本)
大数据Spark入门案例5–统计每广告点击数量排行Top3(scala版本) 1 数据准备 链接:https://pan.baidu.com/s/1afzmL-hNsAJl1_gx_dH2ag 提取码 ...
- Apache Spark+PyTorch 案例实战
Apache Spark+PyTorch 案例实战 随着数据量和复杂性的不断增长,深度学习是提供大数据预测分析解决方案的理想方法,需要增加计算处理能力和更先进的图形处理器.通过深度学习,能够利用非结 ...
- Spark商业案例与性能调优实战100课》第16课:商业案例之NBA篮球运动员大数据分析系统架构和实现思路
Spark商业案例与性能调优实战100课>第16课:商业案例之NBA篮球运动员大数据分析系统架构和实现思路 http://www.basketball-reference.com/leagues ...
- Spark商业案例与性能调优实战100课》第2课:商业案例之通过RDD实现分析大数据电影点评系统中电影流行度分析
Spark商业案例与性能调优实战100课>第2课:商业案例之通过RDD实现分析大数据电影点评系统中电影流行度分析 package com.dt.spark.coresimport org.apa ...
- 《Spark商业案例与性能调优实战100课》第17课:商业案例之NBA篮球运动员大数据分析系统代码实战
<<<Spark商业案例与性能调优实战100课>第17课:商业案例之NBA篮球运动员大数据分析系统代码实战
- 《Spark商业案例与性能调优实战100课》第18课:商业案例之NBA篮球运动员大数据分析代码实战之核心基础数据项编写
<Spark商业案例与性能调优实战100课>第18课:商业案例之NBA篮球运动员大数据分析代码实战之核心基础数据项编写
- 《Spark商业案例与性能调优实战100课》第15课:商业案例之纯粹通过DataSet进行电商交互式分析系统中各种类型TopN分析实战详解
<Spark商业案例与性能调优实战100课>第15课:商业案例之纯粹通过DataSet进行电商交互式分析系统中各种类型TopN分析实战详解
最新文章
- python大量数据折线图-Python数据可视化练习:各种折线图的用法
- GDCM:gdcm::Item的测试程序
- [WPF系列]Button 自定义
- Qt调用OpenCV汇总(1)
- 模板 - 快速沃尔什变换
- 网易java默认路径_java对象存储位置
- php7连接mysql测试代码
- apt-get安装mysql
- 三星可折叠手机Galaxy F再曝光 外观酷炫设计出色
- php properties,PHP ReflectionClass getStaticProperties()用法及代码示例
- bzoj 1085: [SCOI2005]骑士精神(IDA*)
- 使用ildasm获取源代码_有什么比ILDasm好? ILSpy和dnSpy是反编译.NET代码的工具
- java程序开发的简历_Java程序开发工作简历
- 标定学习笔记(四)-- 手眼标定详解
- xmpp即时通讯协议的特性---长处和缺点!
- oracle双活svc,SVC存储虚拟化双活方案
- CSS实现文字竖向排列
- 通达信 服务器 修改,通达信验证服务器数据库修改
- 120行代码爬取电子书网站
- 激光测距仪系统设计 c语言程序),激光测距仪系统设计(机械图,电路图,c语言程序)...
热门文章
- python中ta_非常详细的Ta-Lib安装及使用教程
- java 找到一行 更换单词_Java实现对一行英文进行单词提取功能示例
- Xamarin 技术全解析
- 前端框架——Jquery——基础篇7__工具函数(Utils)
- verilog实现多周期处理器之——(四)逻辑,移位操作与空指令的添加
- 你为什么选择计算机应用专业,致新生!我为什么选择信息工程系
- redis 内存管理分析
- PowerShell 方式部署Sharepoint Solution
- 【安卓深度控件开发(2.2)】LCDView - 进阶绘图
- springMVC使用拦截器针对控制器方法做预处理、后处理、资源清理