Apache Spark 练习七:使用Spark分析化妆品电子商务数据
一、源数据
本章所分析的数据来自于Kaggle公开的化妆品电子商务数据(https://www.kaggle.com/datasets/nowingkim/ecommerce-data-cosmetics-shop)。该数据具体包括以下字段:
column name | description |
---|---|
time | Time when event happened at (in UTC). |
event_name | 4 kinds of value: purchase, cart, view, removefromcart |
product_id | ID of a product |
category_id | Product's category ID |
category_name | Product's category taxonomy (code name) if it was possible to make it. Usually present for meaningful categories and skipped for different kinds of accessories. |
brand | Downcased string of brand name. |
price | Float price of a product. |
user_id | Permanent user ID. |
session | Temporary user's session ID. Same for each user's session. Is changed every time user come back to online store from a long pause. |
category_1 | Largest class of product included |
category_2 | Bigger class of product included |
category_3 | Smallest class of product included |
在开始下面的练习前,将csv文件中的数据全部写入到Kafka的“E_Commerce”消息主题中。
二、练习题
0. 数据预处理
首先,我们对Kafka中的原始数据进行预处理,得到Structured Streaming Dateset。
val spark = SparkSession.builder().appName("Online Shopping").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._// 从Kafka中加载源数据
val schema = new StructType().add("time", "timestamp").add("event_name", "string").add("product_id", "string").add("category_id", "string").add("category_name", "string").add("brand", "string").add("price", "double").add("user_id", "string").add("session", "string").add("category_1", "string").add("category_2", "string").add("category_3", "string")
val df = spark.readStream.format("kafka")// Kafka配置:IP:localhost:9092;Topic:E_Commerce.option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "E_Commerce")// 为更好的进行模拟,设置从头开始消费数据.option("startingOffsets", "earliest")// 为模拟micro-batch的效果,设置每一批数据的最大规模为100000.option("maxOffsetsPerTrigger", 100000).load()// 从Kafka的消息中提取value.selectExpr("CAST(value AS STRING)")// 将按逗号分隔的数据切分为列.select(from_csv($"value",schema,Map("sep" -> ",", "timestampFormat" -> "yyyy-MM-dd HH:mm:ssZ")).as("data")).select("data.*")
1. 找出购买了价格超过100元的家具类商品的购买时间、商品ID和用户ID
val res = df.where("price > 100 and category_1 = 'furniture'").select("time", "product_id", "user_id")
2. 统计每种品牌的累计销售金额
val res = df.filter($"brand" =!= "Not defined" and $"event_name" === "purchase").groupBy($"brand").sum("price")
3. 找出实时交易金额最高的10个品牌名(不含Not defined)
val res = df.withWatermark("time", "10 minute").filter($"event_name" === "purchase" and $"brand" =!= "Not defined").groupBy($"brand").sum("price").orderBy($"sum(price)".desc).limit(10)
4. 统计每小时的用户浏览量、用户购买量
val res = df.withWatermark("time", "10 minutes").filter($"event_name" === "view" or $"event_name" === "purchase").groupBy(window($"time", "1 hour", "1 hour", "0 seconds")).agg(sum(when($"event_name" === "view", 1).otherwise(0)).as("total_view"),sum(when($"event_name" === "purchase", 1).otherwise(0)).as("total_purchase"))
5. 统计每天的交易金额
val res = df.withWatermark("time", "10 minutes").filter($"event_name" === "purchase").groupBy(window($"time", "1 day")).sum("price")
6. 找出每天浏览量最高的10款商品的商品ID
val res = df.withWatermark("time", "10 minutes").filter($"event_name" === "view").groupBy(window($"time", "1 day"), $"product_id").count().orderBy($"window".desc, $"count".desc).limit(10)
7. 找出添加购物车后24小时内购买该商品的用户
val res = df.withWatermark("time", "10 minutes").filter($"event_name" === "cart").as("cart").join(df.withWatermark("time", "10 minutes").filter($"event_name" === "purchase").as("purchase"),expr("""|cart.user_id = purchase.user_id AND|purchase.time >= cart.time AND|purchase.time <= cart.time + interval 24 hours|""".stripMargin),joinType = "inner").select("cart.user_id")
8. 找出浏览了商品但在24小时内未购买它的用户
val res = df.withWatermark("time", "10 minutes").filter($"event_name" === "view").as("view").join(df.withWatermark("time", "10 minutes").filter($"event_name" === "purchase").as("purchase"),expr("""|view.user_id = purchase.user_id AND|purchase.time >= view.time AND|purchase.time <= view.time + interval 24 hours|""".stripMargin),joinType = "leftOuter").where("purchase.user_id IS NULL").select("view.user_id")
Apache Spark 练习七:使用Spark分析化妆品电子商务数据相关推荐
- 学习笔记Spark(七)—— Spark SQL应用(2)—— Spark DataFrame基础操作
二.Spark DataFrame基础操作 2.1.DataFrame DataFrame是一种不可变的分布式数据集,这种数据集被组织成指定的列,类似于关系数据库中的表. 数据集的每一列都带有名称和类 ...
- python 基于情感词典的情感分析之乐,惧,惊,哀,恶,怒和未知七种情感分析
背景 情感分析是通过计算技术对文本内容的主观客观性.情绪等挖掘分析,对文本的情感偏向做出判断.目的是识别出文本中的具体情感分类,之前做文本分类都是通过深度学习或者机器学习进行文本分类,但是需要进行数据 ...
- Apache 流框架 Flink,Spark Streaming,Storm对比分析(一)
https://bigdata.163.com/product/article/5 Apache 流框架 Flink,Spark Streaming,Storm对比分析(一) 转载于:https:// ...
- Flume+Kafka+Spark Streaming+MySQL实时日志分析
文章目录 项目背景 案例需求 一.分析 1.日志分析 二.日志采集 第一步.代码编辑 2.启动采集代码 三.编写Spark Streaming的代码 第一步 创建工程 第二步 选择创建Scala工程 ...
- 【原】Spark中Master源码分析(一)
Master作为集群的Manager,对于集群的健壮运行发挥着十分重要的作用.下面,我们一起了解一下Master是听从Client(Leader)的号召,如何管理好Worker的吧. 1.家当(静态属 ...
- spark算子大全glom_(七)Spark Streaming 算子梳理 — repartition算子
目录 天小天:(一)Spark Streaming 算子梳理 - 简单介绍streaming运行逻辑 天小天:(二)Spark Streaming 算子梳理 - flatMap和mapPartitio ...
- 记录hiveonspark:Execution Error, return code 30041 from org.apache.hadoop.hive.ql.exec.spark.SparkTask
问题描述: 在部署hive on spark,测试时报错,执行建表操作成功,但是插入insert出现以下错误: Failed to execute spark task, with exception ...
- Ignite与Spark内存计算平台对比分析
为什么80%的码农都做不了架构师?>>> 经常有人拿Ignite和Spark进行比较,然后搞不清两者的区别和联系.Ignite和Spark,如果笼统归类,都会归类于内存计算平台 ...
- Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法
Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法 1.spreadOutApp尽量平均分配到每个executor上: 2.非spreadO ...
最新文章
- LeetCode简单题之检查两个字符串数组是否相等
- 小冰公司CEO李笛:强大的AI工具不应该给企业,应该给劳动者 | MEET2021
- 业务专题篇:用户增长分析
- 【渝粤题库】陕西师范大学200431综合英语(一)作业(高起专、高起本)
- Attention模型:我的注意力跟你们人类不一样
- 丘成桐:年轻学者要敢于“无法无天”
- 玩游戏学微积分!探索理工锻炼的游戏化应用, 腾讯发布新游戏《微积历险记》...
- 14 对于移动类型561和账户xxxxxx Ext. GA本币计的金额(023)的不同的字段
- SpringMVC自学日志03(SpringMVC注解)
- 强强合体:Docker版Kali Linux发布
- 数字资产价值巨大,GMQGroup深入布局挖掘数字财富
- Bootstrap Wizard 多步表单控件
- 2021-06-19列表的介绍
- python实现,excel随机抽取特定行到新表中(附上源码和桌面软件)
- 西电计算机软件考研,西安电子科技大学软件工程硕士考研
- pandas读取xlsx文件
- 猴子搬香蕉 php,猴子搬香蕉
- OEM 11g在win7 ie11下报错“证书错误,导航已阻止”的恢复方法
- Java实现swap交换函数的数组方法
- 有关“夜壶冲”的由来
热门文章
- 寻仙手游维护公告服务器停服更新,寻仙手游1月25日停服更新公告 1月25日更新了哪些内容...
- 详细举例说明:原码、反码、补码
- python keyshot_KeyShot 7 非常好用的3D渲染软件
- 如何隐藏C,D,E,F,G,H,I....盘, Windows系统中巧妙隐藏驱动器
- 听说有人要给学员的CSGO/steam游戏搬砖项目投资?
- 站群服务器和普通服务器区别
- 谷歌piexl手机如何刷机 root
- 西安交通大学城市学院计算机课本,西安交通大学城市学院计算机系赴陕西科技大学进行专业课“课程思政”教学交流...
- 计算机网络技术电子工程,浅析计算机网络技术在电子信息工程中的应用
- 【827. 最大人工岛】