一、源数据

本章所分析的数据来自于Kaggle公开的化妆品电子商务数据(https://www.kaggle.com/datasets/nowingkim/ecommerce-data-cosmetics-shop)。该数据具体包括以下字段:

column name description
time Time when event happened at (in UTC).
event_name 4 kinds of value: purchase, cart, view, removefromcart
product_id ID of a product
category_id Product's category ID
category_name Product's category taxonomy (code name) if it was possible to make it. Usually present for meaningful categories and skipped for different kinds of accessories.
brand Downcased string of brand name.
price Float price of a product.
user_id Permanent user ID.
session Temporary user's session ID. Same for each user's session. Is changed every time user come back to online store from a long pause.
category_1 Largest class of product included
category_2 Bigger class of product included
category_3 Smallest class of product included

在开始下面的练习前,将csv文件中的数据全部写入到Kafka的“E_Commerce”消息主题中。

二、练习题

0. 数据预处理

首先,我们对Kafka中的原始数据进行预处理,得到Structured Streaming Dateset。

val spark = SparkSession.builder().appName("Online Shopping").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._// 从Kafka中加载源数据
val schema = new StructType().add("time", "timestamp").add("event_name", "string").add("product_id", "string").add("category_id", "string").add("category_name", "string").add("brand", "string").add("price", "double").add("user_id", "string").add("session", "string").add("category_1", "string").add("category_2", "string").add("category_3", "string")
val df = spark.readStream.format("kafka")// Kafka配置:IP:localhost:9092;Topic:E_Commerce.option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "E_Commerce")// 为更好的进行模拟,设置从头开始消费数据.option("startingOffsets", "earliest")// 为模拟micro-batch的效果,设置每一批数据的最大规模为100000.option("maxOffsetsPerTrigger", 100000).load()// 从Kafka的消息中提取value.selectExpr("CAST(value AS STRING)")// 将按逗号分隔的数据切分为列.select(from_csv($"value",schema,Map("sep" -> ",", "timestampFormat" -> "yyyy-MM-dd HH:mm:ssZ")).as("data")).select("data.*")

1. 找出购买了价格超过100元的家具类商品的购买时间、商品ID和用户ID

val res = df.where("price > 100 and category_1 = 'furniture'").select("time", "product_id", "user_id")

2. 统计每种品牌的累计销售金额

val res = df.filter($"brand" =!= "Not defined" and $"event_name" === "purchase").groupBy($"brand").sum("price")

3. 找出实时交易金额最高的10个品牌名(不含Not defined)

val res = df.withWatermark("time", "10 minute").filter($"event_name" === "purchase" and $"brand" =!= "Not defined").groupBy($"brand").sum("price").orderBy($"sum(price)".desc).limit(10)

4. 统计每小时的用户浏览量、用户购买量

val res = df.withWatermark("time", "10 minutes").filter($"event_name" === "view" or $"event_name" === "purchase").groupBy(window($"time", "1 hour", "1 hour", "0 seconds")).agg(sum(when($"event_name" === "view", 1).otherwise(0)).as("total_view"),sum(when($"event_name" === "purchase", 1).otherwise(0)).as("total_purchase"))

5. 统计每天的交易金额

val res = df.withWatermark("time", "10 minutes").filter($"event_name" === "purchase").groupBy(window($"time", "1 day")).sum("price")

6. 找出每天浏览量最高的10款商品的商品ID

val res = df.withWatermark("time", "10 minutes").filter($"event_name" === "view").groupBy(window($"time", "1 day"), $"product_id").count().orderBy($"window".desc, $"count".desc).limit(10)

7. 找出添加购物车后24小时内购买该商品的用户

val res = df.withWatermark("time", "10 minutes").filter($"event_name" === "cart").as("cart").join(df.withWatermark("time", "10 minutes").filter($"event_name" === "purchase").as("purchase"),expr("""|cart.user_id = purchase.user_id AND|purchase.time >= cart.time AND|purchase.time <= cart.time + interval 24 hours|""".stripMargin),joinType = "inner").select("cart.user_id")

8. 找出浏览了商品但在24小时内未购买它的用户

val res = df.withWatermark("time", "10 minutes").filter($"event_name" === "view").as("view").join(df.withWatermark("time", "10 minutes").filter($"event_name" === "purchase").as("purchase"),expr("""|view.user_id = purchase.user_id AND|purchase.time >= view.time AND|purchase.time <= view.time + interval 24 hours|""".stripMargin),joinType = "leftOuter").where("purchase.user_id IS NULL").select("view.user_id")

Apache Spark 练习七:使用Spark分析化妆品电子商务数据相关推荐

  1. 学习笔记Spark(七)—— Spark SQL应用(2)—— Spark DataFrame基础操作

    二.Spark DataFrame基础操作 2.1.DataFrame DataFrame是一种不可变的分布式数据集,这种数据集被组织成指定的列,类似于关系数据库中的表. 数据集的每一列都带有名称和类 ...

  2. python 基于情感词典的情感分析之乐,惧,惊,哀,恶,怒和未知七种情感分析

    背景 情感分析是通过计算技术对文本内容的主观客观性.情绪等挖掘分析,对文本的情感偏向做出判断.目的是识别出文本中的具体情感分类,之前做文本分类都是通过深度学习或者机器学习进行文本分类,但是需要进行数据 ...

  3. Apache 流框架 Flink,Spark Streaming,Storm对比分析(一)

    https://bigdata.163.com/product/article/5 Apache 流框架 Flink,Spark Streaming,Storm对比分析(一) 转载于:https:// ...

  4. Flume+Kafka+Spark Streaming+MySQL实时日志分析

    文章目录 项目背景 案例需求 一.分析 1.日志分析 二.日志采集 第一步.代码编辑 2.启动采集代码 三.编写Spark Streaming的代码 第一步 创建工程 第二步 选择创建Scala工程 ...

  5. 【原】Spark中Master源码分析(一)

    Master作为集群的Manager,对于集群的健壮运行发挥着十分重要的作用.下面,我们一起了解一下Master是听从Client(Leader)的号召,如何管理好Worker的吧. 1.家当(静态属 ...

  6. spark算子大全glom_(七)Spark Streaming 算子梳理 — repartition算子

    目录 天小天:(一)Spark Streaming 算子梳理 - 简单介绍streaming运行逻辑 天小天:(二)Spark Streaming 算子梳理 - flatMap和mapPartitio ...

  7. 记录hiveonspark:Execution Error, return code 30041 from org.apache.hadoop.hive.ql.exec.spark.SparkTask

    问题描述: 在部署hive on spark,测试时报错,执行建表操作成功,但是插入insert出现以下错误: Failed to execute spark task, with exception ...

  8. Ignite与Spark内存计算平台对比分析

    为什么80%的码农都做不了架构师?>>>    经常有人拿Ignite和Spark进行比较,然后搞不清两者的区别和联系.Ignite和Spark,如果笼统归类,都会归类于内存计算平台 ...

  9. Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法

    Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法 1.spreadOutApp尽量平均分配到每个executor上: 2.非spreadO ...

最新文章

  1. LeetCode简单题之检查两个字符串数组是否相等
  2. 小冰公司CEO李笛:强大的AI工具不应该给企业,应该给劳动者 | MEET2021
  3. 业务专题篇:用户增长分析
  4. 【渝粤题库】陕西师范大学200431综合英语(一)作业(高起专、高起本)
  5. Attention模型:我的注意力跟你们人类不一样
  6. 丘成桐:年轻学者要敢于“无法无天”
  7. 玩游戏学微积分!探索理工锻炼的游戏化应用, 腾讯发布新游戏《微积历险记》...
  8. 14 对于移动类型561和账户xxxxxx Ext. GA本币计的金额(023)的不同的字段
  9. SpringMVC自学日志03(SpringMVC注解)
  10. 强强合体:Docker版Kali Linux发布
  11. 数字资产价值巨大,GMQGroup深入布局挖掘数字财富
  12. Bootstrap Wizard 多步表单控件
  13. 2021-06-19列表的介绍
  14. python实现,excel随机抽取特定行到新表中(附上源码和桌面软件)
  15. 西电计算机软件考研,西安电子科技大学软件工程硕士考研
  16. pandas读取xlsx文件
  17. 猴子搬香蕉 php,猴子搬香蕉
  18. OEM 11g在win7 ie11下报错“证书错误,导航已阻止”的恢复方法
  19. Java实现swap交换函数的数组方法
  20. 有关“夜壶冲”的由来

热门文章

  1. 寻仙手游维护公告服务器停服更新,寻仙手游1月25日停服更新公告 1月25日更新了哪些内容...
  2. 详细举例说明:原码、反码、补码
  3. python keyshot_KeyShot 7 非常好用的3D渲染软件
  4. 如何隐藏C,D,E,F,G,H,I....盘, Windows系统中巧妙隐藏驱动器
  5. 听说有人要给学员的CSGO/steam游戏搬砖项目投资?
  6. 站群服务器和普通服务器区别
  7. 谷歌piexl手机如何刷机 root
  8. 西安交通大学城市学院计算机课本,西安交通大学城市学院计算机系赴陕西科技大学进行专业课“课程思政”教学交流...
  9. 计算机网络技术电子工程,浅析计算机网络技术在电子信息工程中的应用
  10. 【827. 最大人工岛】