Spark DataFrame&RDD案例实现

  • 1.DataFram类型
  • 2.需求:将orders表中days_since_prior_order中的空置(" ")转换成0
  • idea中快捷键
  • 2.每个用户平均购买订单的间隔周期
  • 3.每个用户的总订单数量(分组)
  • 4.每个用户购买的product商品去重后的集合数据
  • 4.每个用户总商品数量以及去重后的商品数量(distinct count)
  • 需求:合并“去重后集合的数据”和“去重后的商品数量” 统计逻辑
  • 5.每个用户购买的平均每个订单的商品数量(hive已经实现过了)

1.DataFram类型

针对NULL值可以转换成0

df.na.fill(0)

2.需求:将orders表中days_since_prior_order中的空置(" ")转换成0

val orders = spark.sql("select * from badou.orders")
val priors = spark.sql("select * from badou.priors")orders表
+--------+-------+--------+------------+---------+-----------------+----------------------+
|order_id|user_id|eval_set|order_number|order_dow|order_hour_of_day|days_since_prior_order|
+--------+-------+--------+------------+---------+-----------------+----------------------+
| 2539329|      1|   prior|           1|        2|               08|                      |
| 2398795|      1|   prior|           2|        3|               07|                  15.0|
|  473747|      1|   prior|           3|        3|               12|                  21.0|
| 2254736|      1|   prior|           4|        4|               07|                  29.0|
|  431534|      1|   prior|           5|        4|               15|                  28.0|
+--------+-------+--------+------------+---------+-----------------+----------------------+

因为days_since_prior_order中有空值,所以要进行空值填充,可以利用selectExpr()

scala> orders.selectExpr("*","if(days_since_prior_order='',0.0 ,days_since_prior_order) as dspo").show(5)
+--------+-------+--------+------------+---------+-----------------+----------------------+----+
|order_id|user_id|eval_set|order_number|order_dow|order_hour_of_day|days_since_prior_order|dspo|
+--------+-------+--------+------------+---------+-----------------+----------------------+----+
| 2539329|      1|   prior|           1|        2|               08|                      | 0.0|
| 2398795|      1|   prior|           2|        3|               07|                  15.0|15.0|
|  473747|      1|   prior|           3|        3|               12|                  21.0|21.0|
| 2254736|      1|   prior|           4|        4|               07|                  29.0|29.0|
|  431534|      1|   prior|           5|        4|               15|                  28.0|28.0|
+--------+-------+--------+------------+---------+-----------------+----------------------+----+
only showing top 5 rows

删除days_since_prior_order这列

scala> val orderNew = orders.selectExpr("*","if(days_since_prior_order='',0.0,days_since_prior_order) as dspo").drop("days_since_prior_order")
orderNew: org.apache.spark.sql.DataFrame = [order_id: string, user_id: string ... 5 more fields]scala> orderNew.show(5)
+--------+-------+--------+------------+---------+-----------------+----+
|order_id|user_id|eval_set|order_number|order_dow|order_hour_of_day|dspo|
+--------+-------+--------+------------+---------+-----------------+----+
| 2539329|      1|   prior|           1|        2|               08| 0.0|
| 2398795|      1|   prior|           2|        3|               07|15.0|
|  473747|      1|   prior|           3|        3|               12|21.0|
| 2254736|      1|   prior|           4|        4|               07|29.0|
|  431534|      1|   prior|           5|        4|               15|28.0|
+--------+-------+--------+------------+---------+-----------------+----+

idea中快捷键

将多行合并成一场 ctrl shift + j
返回上一步操作 : ctrl + z

2.每个用户平均购买订单的间隔周期

注意:用户的第一个订单没有间隔天数的,需要赋值为0间隔天数在orders表中

scala> val userGap = orderNew.selectExpr("user_id","cast(dspo as int) as dspo").groupBy("user_id").avg("dspo")scala> userGap.show(5)+-------+------------------+
|user_id|         avg(dspo)|
+-------+------------------+
|    296| 5.428571428571429|
|    467| 8.833333333333334|
|    675|              20.0|
|    691|13.173913043478262|
|    829|              9.25|
+-------+------------------+
only showing top 5 rows

将avg(dspo)进行重命名

scala> val userGap = orderNew.selectExpr("user_id","cast(dspo as int) as dspo").groupBy("user_id").avg("dspo").withColumnRenamed("avg(dspo)","u_avg_day_gap")

3.每个用户的总订单数量(分组)

scala> val userOrdCnt = orders.groupBy("user_id").count()scala> userOrdCnt.show(5)+-------+-----+
|user_id|count|
+-------+-----+
|    296|    7|
|    467|    6|
|    675|   11|
|    691|   23|
|    829|    4|
+-------+-----+
only showing top 5 rows

4.每个用户购买的product商品去重后的集合数据

结果: 1001 101200,120219,129101
将orders与priors表进行关联
问题:""代表String ’ '代表char类型

scala> val opDF = orders.join(priors,"order_id")
scala> val up = opDF.select("user_id","product_id")

将DataFrame 转变成RDD需要下面隐式转换包,利用隐式转换 因为后面Spark的算子操作如map()、groupBy()

DataFrame中没有mapValues()!!!!!!
注意:在使用RDD算子前进行(.rdd)转换就行

import spark.implicits._

注意:RDD类型没有show() 只有take()来查看数据

a、将product_id按照user_id进行分组
groupByKey() 按key进行分组 注意 键值对 => {key:values}

方式一:
scala> val rddGroup = up.rdd.map{x=>(x(0).toString,x(1).toString)}.groupByKey()
方式二:
scala> val rdGroup = up.map{ x=>(x(0).toString,x(1).toString)}.rdd.groupByKey()
结果:take(5) //从RDD中获取5个元素
scala> rddGroup.take(5):36 WARN executor.Executor: Managed memory leak detected; size = 5244782 bytes, TID = 415
res11: Array[(String, Iterable[String])] = Array((124168,CompactBuffer(20082, 20082, 14303, 11323, 46522, 11323, 20082, 20082, 22108, 22108))
, (120928,CompactBuffer(39275, 18897, 27845, 47209, 30440, 34448, 45948, 4605, 27966, 30252, 20995))

b、将商品进行去重

scala> mapValues(record=>record.toSet.mkString(","))   product商品去重(toSet)后的利用mkString(",")逗号进行拼接
var rddRecords = up.rdd.map{x=>(x(0).toString,x(1).toString)}.groupByKey().mapValues(record=>record.toSet.mkString(","))
结果
scala> rddRecords.take(5)
:18 WARN executor.Executor: Managed memory leak detected; size = 5244782 bytes, TID = 214
res9: Array[(String, String)] = Array((124168,20082,20082,14303,11323,46522,11323,20082,20082,22108,22108)
, (120928,39275,18897,27845,47209,30440,34448,45948,4605,27966,30252,20995,5194,13629,36695,25824,42265,39928,13870,41665,39561,4799)

d、RDD 转换成DF
toDF(“自定义名1”,“自定义名2”) 根据RDD已有的列数可以进行重命名

scala> rddRecords.toDF("user_id","prod_cnt").show(5)
结果
size = 5244782 bytes, TID = 423
+-------+--------------------+
|user_id|            prod_cnt|
+-------+--------------------+
| 124168|20082,20082,14303...|
| 120928|39275,18897,27845...|
| 186692|11365,6184,14161,...|
|  43535|38185,33129,13187...|
|   8965|33198,25466,27966...|
+-------+--------------------+

4.每个用户总商品数量以及去重后的商品数量(distinct count)

a、user所有的商品数量(没有去重)

val  userAllProd = up.groupBy("user_id").count()
scala> userAllProd.show(5)
+-------+-----+
|user_id|count|
+-------+-----+
|  88447|  185|
| 144913|  198|
| 145079|  612|
|  13282|   99|
| 124057|   80|
+-------+-----+
only showing top 5 rows

b、去重后的商品数量

val userUnOrdCnt = up.rdd.map{x=>(x(0).toString, x(1).toString)}.groupByKey().mapValues(_.toSet.size).toDF("user_id","prod_dis_cnt")

汇总关联:

scala> userAllProd.join(userUnOrdCnt,"user_id").select("*").show(10)
+-------+-----+------------+
|user_id|count|prod_dis_cnt|
+-------+-----+------------+
| 100010|  206|         119|
| 100140|   32|          28|
| 100227|  134|          70|
| 100263|  212|          38|
| 100320|  219|         121|
| 100553|  645|         243|
| 100704|   50|          40|
| 100735|   50|          42|
| 100768|    9|           8|
|  10096|   90|          47|
+-------+-----+------------+
only showing top 10 rows

//处理groupByKey的时候,看看能不能进行提前合并

需求:合并“去重后集合的数据”和“去重后的商品数量” 统计逻辑

方式一:合并 提取公因子,引入cache 提高效率

val userRddGroup = up.rdd.map{x=>(x(0).toString, x(1).toString)}.groupByKey().cache()userRddGroup.mapValues(_.toSet.mkString(",")).toDF("user_id","prod_records")
从缓存中移除  python  del xxx
userRddGroup.unpersist()

方式二:同时计算两个

//返回tuple类型 (1.商品的大小(size),2.商品的List集合,利用逗号进行拼接)
mapValues{records=>
val rs = records.toSet;    (rs.size , rs.mkString(","))}
val userProRcdSize = up.rdd.map{x=>(x(0).toString, x(1).toString)}.groupByKey().mapValues{records=>val rs = records.toSet;//返回tuple类型 (1.商品的大小(size),2.商品的List集合,利用逗号进行拼接)(rs.size , rs.mkString(","))}.toDF("user_id", "tuple").selectExpr("user_id","tuple._1 as prod_dist_size", "tuple._2 as prod_records")

方式三:使用自带的函数的处理
as(“af”) 重命名 可以使用 withColumnRenamed()替代但是位置不同

//导入sql中所有的包
import org.apache.spark.sql.functions._val usergroup = up.groupBy("user_id").agg(size(collect_set("product_id")).as("prod_dist_size"),collect_set("product_id").as("prod_records"))
结果:
+-------+--------------+--------------------+
|user_id|prod_dist_size|        prod_records|
+-------+--------------+--------------------+
| 100010|           119|[7751, 27360, 247...|
| 100140|            28|[7021, 27845, 436...|
| 100227|            70|[24834, 45007, 27...|
| 100263|            38|[5157, 38928, 372...|
| 100320|           121|[39891, 34358, 17...|
| 100553|           243|[6873, 46075, 205...|
| 100704|            40|[14999, 3434, 308...|
| 100735|            42|[45368, 7430, 207...|
| 100768|             8|[40199, 18838, 49...|
|  10096|            47|[8021, 13629, 895...|
+-------+--------------+--------------------+
only showing top 10 rows

5.每个用户购买的平均每个订单的商品数量(hive已经实现过了)

我今天购买了2个order,一个是10个商品,另一个是4个商品
(10+4)一个订单对应多少个商品 / 2
结果:一个用户购买了几个商品=7

//    1) 每个订单有多少个商品
val ordProdCnt = priors.groupBy("order_id").count()//    2)求每个用户订单商品数量的平均值  user_id  product_id
val userPerOrdProdCnt = orders.join(ordProdCnt, "order_id").groupBy("user_id").agg(avg("count").as("u_avg_ord_prods"))
结果:
scala> userPerOrdProdCnt.show(5)
+-------+---------------+
|user_id|u_avg_ord_prods|
+-------+---------------+
|  88447|            7.4|
| 144913|            5.5|
| 145079|            8.5|
|  13282|         6.1875|
| 124057|           20.0|
+-------+---------------+
only showing top 5 rows

Spark DataFrameRDD案例实现相关推荐

  1. 从原理到策略算法再到架构产品看推荐系统 | 附Spark实践案例

    原文链接:mp.weixin.qq.com  作者 | HCY崇远 01 前言 本文源自于前阵子连续更新的推荐系统系列,前段时间给朋友整理一个关于推荐系统相关的知识教学体系,刚好自身业务中,预计明年初 ...

  2. Spark RDD案例(五)经纬度转换为地理位置

    Spark RDD案例(五)经纬度转换为地理位置 1. 背景 Spark作为大数据分析引擎,本身可以做离线和准实时数据处理 Spark抽象出的操作对象如RDD.dataSet.dataFrame.DS ...

  3. 大数据Spark入门案例5–统计广告点击数量排行Top3(scala版本)

    大数据Spark入门案例5–统计每广告点击数量排行Top3(scala版本) 1 数据准备 链接:https://pan.baidu.com/s/1afzmL-hNsAJl1_gx_dH2ag 提取码 ...

  4. Apache Spark+PyTorch 案例实战

    Apache Spark+PyTorch 案例实战  随着数据量和复杂性的不断增长,深度学习是提供大数据预测分析解决方案的理想方法,需要增加计算处理能力和更先进的图形处理器.通过深度学习,能够利用非结 ...

  5. Spark商业案例与性能调优实战100课》第16课:商业案例之NBA篮球运动员大数据分析系统架构和实现思路

    Spark商业案例与性能调优实战100课>第16课:商业案例之NBA篮球运动员大数据分析系统架构和实现思路 http://www.basketball-reference.com/leagues ...

  6. Spark商业案例与性能调优实战100课》第2课:商业案例之通过RDD实现分析大数据电影点评系统中电影流行度分析

    Spark商业案例与性能调优实战100课>第2课:商业案例之通过RDD实现分析大数据电影点评系统中电影流行度分析 package com.dt.spark.coresimport org.apa ...

  7. 《Spark商业案例与性能调优实战100课》第17课:商业案例之NBA篮球运动员大数据分析系统代码实战

    <<<Spark商业案例与性能调优实战100课>第17课:商业案例之NBA篮球运动员大数据分析系统代码实战

  8. 《Spark商业案例与性能调优实战100课》第18课:商业案例之NBA篮球运动员大数据分析代码实战之核心基础数据项编写

    <Spark商业案例与性能调优实战100课>第18课:商业案例之NBA篮球运动员大数据分析代码实战之核心基础数据项编写

  9. 《Spark商业案例与性能调优实战100课》第15课:商业案例之纯粹通过DataSet进行电商交互式分析系统中各种类型TopN分析实战详解

    <Spark商业案例与性能调优实战100课>第15课:商业案例之纯粹通过DataSet进行电商交互式分析系统中各种类型TopN分析实战详解

最新文章

  1. python大量数据折线图-Python数据可视化练习:各种折线图的用法
  2. GDCM:gdcm::Item的测试程序
  3. [WPF系列]Button 自定义
  4. Qt调用OpenCV汇总(1)
  5. 模板 - 快速沃尔什变换
  6. 网易java默认路径_java对象存储位置
  7. php7连接mysql测试代码
  8. apt-get安装mysql
  9. 三星可折叠手机Galaxy F再曝光 外观酷炫设计出色
  10. php properties,PHP ReflectionClass getStaticProperties()用法及代码示例
  11. bzoj 1085: [SCOI2005]骑士精神(IDA*)
  12. 使用ildasm获取源代码_有什么比ILDasm好? ILSpy和dnSpy是反编译.NET代码的工具
  13. java程序开发的简历_Java程序开发工作简历
  14. 标定学习笔记(四)-- 手眼标定详解
  15. xmpp即时通讯协议的特性---长处和缺点!
  16. oracle双活svc,SVC存储虚拟化双活方案
  17. CSS实现文字竖向排列
  18. 通达信 服务器 修改,通达信验证服务器数据库修改
  19. 120行代码爬取电子书网站
  20. 激光测距仪系统设计 c语言程序),激光测距仪系统设计(机械图,电路图,c语言程序)...

热门文章

  1. python中ta_非常详细的Ta-Lib安装及使用教程
  2. java 找到一行 更换单词_Java实现对一行英文进行单词提取功能示例
  3. Xamarin 技术全解析
  4. 前端框架——Jquery——基础篇7__工具函数(Utils)
  5. verilog实现多周期处理器之——(四)逻辑,移位操作与空指令的添加
  6. 你为什么选择计算机应用专业,致新生!我为什么选择信息工程系
  7. redis 内存管理分析
  8. PowerShell 方式部署Sharepoint Solution
  9. 【安卓深度控件开发(2.2)】LCDView - 进阶绘图
  10. springMVC使用拦截器针对控制器方法做预处理、后处理、资源清理