案例实操-Top10热门品类
需求1:分别统计每个品类的点击次数,下单次数和支付次数
(品类,点击总数)(品类,下单总数)(品类,支付总数)
排名顺序如:点击总数>下单总数>支付总数
方案一
def main(args: Array[String]): Unit = {//TODO : TOP10热门品类val sparkConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")val sc = new SparkContext(sparkConf)// 1.读取原始日志数据val actionRDD = sc.textFile("datas/user_visit_action.txt")// 2.统计品类的点击数量:(品类ID,点击数量)val clickActionRDD = actionRDD.filter(action => {val datas = action.split("_")datas(6) != "-1"})val clickCountRDD: RDD[(String, Int)] = clickActionRDD.map(action => {val datas = action.split("_")(datas(6), 1)}).reduceByKey(_ + _)// 3.统计品类的下单数量:(品类ID,下单数量)val orderActionRDD = actionRDD.filter(action => {val datas = action.split("_")datas(8) != "null"})//orderid => 1,2,3//[(1,1),(2,1),(3,1)]val orderCountRDD = orderActionRDD.flatMap(action => {val datas = action.split("_")val cid = datas(8)val cids = cid.split(",")cids.map(id => (id, 1))}).reduceByKey(_ + _)// 4.统计品类的支付数量:(品类ID,支付数量)val payActionRDD = actionRDD.filter(action => {val datas = action.split("_")datas(10) != "null"})//orderid => 1,2,3//[(1,1),(2,1),(3,1)]val payCountRDD = orderActionRDD.flatMap(action => {val datas = action.split("_")val cid = datas(10)val cids = cid.split(",")cids.map(id => (id, 1))}).reduceByKey(_ + _)// 5.将品类进行排序,并且取前10名// 点击数量排序,下单数量排序,支付数量排序// 元组排序:先比较第一个,再比较第二个,再比较第三个,依次类推// (品类ID,(点击数量,下单数量,支付数量))// cogroup = connect + groupval cogroupRDD: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] =clickCountRDD.cogroup(orderCountRDD, payCountRDD)val analysisRDD = cogroupRDD.mapValues {case (clickIter, orderIter, payIter) => {var clickCnt = 0var iter1 = clickIter.iteratorif (iter1.hasNext) {clickCnt = iter1.next()}var orderCnt = 0var iter2 = orderIter.iteratorif (iter2.hasNext) {orderCnt = iter2.next()}var payCnt = 0var iter3 = payIter.iteratorif (iter3.hasNext){payCnt = iter3.next()}(clickCnt, orderCnt, payCnt)}}val resultRDD = analysisRDD.sortBy(_._2, false).take(10)// 6.将结果采集到控制台打印出来resultRDD.foreach(println)sc.stop()}
方案二
def main(args: Array[String]): Unit = {//TODO : TOP10热门品类val sparkConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")val sc = new SparkContext(sparkConf)// Q : actionRDD重复使用// Q : cogroup性能可能较低// 1.读取原始日志数据val actionRDD = sc.textFile("datas/user_visit_action.txt")actionRDD.cache()// 2.统计品类的点击数量:(品类ID,点击数量)val clickActionRDD = actionRDD.filter(action => {val datas = action.split("_")datas(6) != "-1"})val clickCountRDD: RDD[(String, Int)] = clickActionRDD.map(action => {val datas = action.split("_")(datas(6), 1)}).reduceByKey(_ + _)// 3.统计品类的下单数量:(品类ID,下单数量)val orderActionRDD = actionRDD.filter(action => {val datas = action.split("_")datas(8) != "null"})val orderCountRDD = orderActionRDD.flatMap(action => {val datas = action.split("_")val cid = datas(8)val cids = cid.split(",")cids.map(id => (id, 1))}).reduceByKey(_ + _)// 4.统计品类的支付数量:(品类ID,支付数量)val payActionRDD = actionRDD.filter(action => {val datas = action.split("_")datas(10) != "null"})val payCountRDD = orderActionRDD.flatMap(action => {val datas = action.split("_")val cid = datas(10)val cids = cid.split(",")cids.map(id => (id, 1))}).reduceByKey(_ + _)// (品类ID,点击数量) => (品类ID,(点击数量,0,0))// (品类ID,下单数量) => (品类ID,(0,下单数量,0))// (品类ID,支付数量) => (品类ID,(0,0,支付数量))// (品类ID,(点击数量,下单数量,支付数量))// 5.将品类进行排序,并且取前10名// 点击数量排序,下单数量排序,支付数量排序// 元组排序:先比较第一个,再比较第二个,再比较第三个,依次类推// (品类ID,(点击数量,下单数量,支付数量))val rdd1 = clickCountRDD.map {case (cid, cnt) => {(cid, (cnt, 0, 0))}}val rdd2 = orderCountRDD.map {case (cid, cnt) => {(cid, (0, cnt, 0))}}val rdd3 = payCountRDD.map {case (cid, cnt) => {(cid, (0, 0, cnt))}}// 将三个数据源合并在一起,统一进行聚合计算val soruceRDD: RDD[(String, (Int, Int, Int))] = rdd1.union(rdd2).union(rdd3)val analysisRDD = soruceRDD.reduceByKey((t1, t2) => {(t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)})val resultRDD = analysisRDD.sortBy(_._2, false).take(10)// 6.将结果采集到控制台打印出来resultRDD.foreach(println)sc.stop()}
方案三
def main(args: Array[String]): Unit = {//TODO : TOP10热门品类val sparkConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")val sc = new SparkContext(sparkConf)// Q : 存在大量的shuffle操作(reduceByKey)// reduceByKey 聚合算子;spark会提供优化,缓存// 1.读取原始日志数据val actionRDD = sc.textFile("datas/user_visit_action.txt")// 2.将数据转换结构// 点击的场合:(品类ID,(1,0,0))// 下单的场合:(品类ID,(0,1,0))// 支付的场合:(品类ID,(0,0,1))val flatRDD: RDD[(String, (Int, Int, Int))] = actionRDD.flatMap(action => {val datas = action.split("_")if (datas(6) != "-1") {//点击的场合List((datas(6), (1, 0, 0)))} else if (datas(8) != "null") {//下单的场合val ids = datas(8).split(",")ids.map(id => (id, (0, 1, 0)))} else if (datas(10) != "null") {//支付的场合val ids = datas(10).split(",")ids.map(id => (id, (0, 0, 1)))} else {Nil}})// 3.将相同的品类ID的数据进行分组聚合// (品类ID,(点击数量,下单数量,支付数量))val analysisRDD = flatRDD.reduceByKey((t1, t2) => {(t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)})// 4.将统计结构根据数量进行降序处理,取前10名val resultRDD = analysisRDD.sortBy(_._2, false).take(10)// 5.将结果采集到控制台打印出来resultRDD.foreach(println)sc.stop()}
方案四
def main(args: Array[String]): Unit = {//TODO : TOP10热门品类val sparkConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")val sc = new SparkContext(sparkConf)// Q : 存在大量的shuffle操作(reduceByKey)// reduceByKey 聚合算子;spark会提供优化,缓存// 1.读取原始日志数据val actionRDD = sc.textFile("datas/user_visit_action.txt")val acc = new HotCategoryAccmulatorsc.register(acc, "hotCategory")// 2.将数据转换结构actionRDD.foreach(action => {val datas = action.split("_")if (datas(6) != "-1") {//点击的场合acc.add((datas(6), "click"))} else if (datas(8) != "null") {//下单的场合val ids = datas(8).split(",")ids.foreach(id => {acc.add((id, "order"))})} else if (datas(10) != "null") {//支付的场合val ids = datas(10).split(",")ids.foreach(id => {acc.add((id, "pay"))})} else {Nil}})val accVal: mutable.Map[String, HotCategory] = acc.valueval categories: mutable.Iterable[HotCategory] = accVal.map(_._2)val sort = categories.toList.sortWith((left, right) => {if (left.clickCnt > right.clickCnt) {true} else if (left.clickCnt == right.clickCnt) {if (left.orderCnt > right.orderCnt) {true} else if (left.orderCnt == right.orderCnt) {left.payCnt > right.payCnt} else {false}}else {false}})// 5.将结果采集到控制台打印出来sort.take(10).foreach(println)sc.stop()}case class HotCategory(cid: String, var clickCnt: Int, var orderCnt: Int, var payCnt: Int) {}/*** 自定义累加器* 1.继承AccumulatorV2,定义泛型* IN:(品类ID,行为类型)* OUT: mutable.Map[String,HotCategory]* 2.重写方法(6)*/class HotCategoryAccmulator extends AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] {private val hcMap = mutable.Map[String, HotCategory]()override def isZero: Boolean = {hcMap.isEmpty}override def copy(): AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] = {new HotCategoryAccmulator()}override def reset(): Unit = {hcMap.clear()}override def add(v: (String, String)): Unit = {val cid = v._1val actionType = v._2val category: HotCategory = hcMap.getOrElse(cid, HotCategory(cid, 0, 0, 0))if (actionType == "click") {category.clickCnt += 1} else if (actionType == "order") {category.orderCnt += 1} else if (actionType == "pay") {category.payCnt += 1}hcMap.update(cid, category)}override def merge(other: AccumulatorV2[(String, String), mutable.Map[String, HotCategory]]): Unit = {val map1 = this.hcMapval map2 = other.valuemap2.foreach {case (cid, hc) => {val category: HotCategory = map1.getOrElse(cid, HotCategory(cid, 0, 0, 0))category.clickCnt += hc.clickCntcategory.orderCnt += hc.orderCntcategory.payCnt += hc.payCntmap1.update(cid, category)}}}override def value: mutable.Map[String, HotCategory] = hcMap}
需求2:在需求一的基础上,增加每个品类用户 session 的点击统计
def main(args: Array[String]): Unit = {//TODO : TOP10热门品类val sparkConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")val sc = new SparkContext(sparkConf)val actionRDD = sc.textFile("datas/user_visit_action.txt")actionRDD.cache()val top10Ids = top10Category(actionRDD)// 1.过滤原始数据,保留点击和前10品类IDval filterActionRDD = actionRDD.filter(action => {val datas = action.split("_")if (datas(6) != "-1") {top10Ids.contains(datas(6))} else {false}})//2.根据品类ID和sessionid进行点击量的统计val reduceRDD: RDD[((String, String), Int)] = filterActionRDD.map(action => {val datas = action.split("_")((datas(6), datas(2)), 1)}).reduceByKey(_ + _)//3.将统计的结果进行结构的转换//((品类ID,sessionId),sum) => (品类ID,(sessionId,sum))val mapRDD = reduceRDD.map {case ((cid, sid), sum) => {(cid, (sid, sum))}}//4.相同的品类进行分组val groupRDD: RDD[(String,Iterable[(String,Int)])] = mapRDD.groupByKey()//5.将分组后的数据进行点击量的排序,取前10名val resultRDD = groupRDD.mapValues(iter => {iter.toList.sortBy(_._2)(Ordering.Int).take(10)})resultRDD.collect().foreach(println)sc.stop()}def top10Category(actionRDD: RDD[String]) = {val flatRDD: RDD[(String, (Int, Int, Int))] = actionRDD.flatMap(action => {val datas = action.split("_")if (datas(6) != "-1") {//点击的场合List((datas(6), (1, 0, 0)))} else if (datas(8) != "null") {//下单的场合val ids = datas(8).split(",")ids.map(id => (id, (0, 1, 0)))} else if (datas(10) != "null") {//支付的场合val ids = datas(10).split(",")ids.map(id => (id, (0, 0, 1)))} else {Nil}})val analysisRDD = flatRDD.reduceByKey((t1, t2) => {(t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)})analysisRDD.sortBy(_._2, false).take(10).map(_._1)}
案例实操-Top10热门品类相关推荐
- 大数据之Spark案例实操完整使用(第六章)
大数据之Spark案例实操完整使用 一.案例一 1.准备数据 2.需求 1:Top10 热门品类 3.需求说明 方案一. 实现方案二 实现方案三 二 .需求实现 1.需求 2:Top10 热门品类中每 ...
- 【报告分享】见实私域流量白皮书:私域流量案例实操手册.pdf
大家好,我是文文(微信:sscbg2020),今天给大家分享见实科技于2020年10月份发布的报告<见实私域流量白皮书:私域流量案例实操手册.pdf>. 本报告共73页,包含如下鞋服.餐饮 ...
- 新硬盘挂载-fdisk+mount案例实操
新硬盘挂载-fdisk+mount案例实操 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 现在很多服务器都支持热插拔了,当有新的硬盘插入到服务器上我们需要将其分区,格式化,然后挂载 ...
- 数据模型同学看过来|代码案例实操来袭
去年年底,央行开出反洗钱罚单,多家银行合计被罚1040万元. 当时,中国人民银行石家庄中心支行披露的反洗钱行政处罚信息公示表显示,因涉及未按照规定履行客户身份识别义务等,中行.邮储.浦发三家银行以及阳 ...
- 自定义OutputFormat案例实操
自定义OutputFormat案例实操 文章目录 1)需求 2)需求分析 3)编程实现 1.创建Mapper类 2.创建Reducer类 3.创建OutputFormat类 4.创建RecordWri ...
- MapReduce入门(一)—— MapReduce概述 + WordCount案例实操
MapReduce入门(一)-- MapReduce概述 文章目录 MapReduce入门(一)-- MapReduce概述 1.1 MapReduce 定义 1.2 MapReduce 优缺点 1. ...
- Azkaban配置Work Flow案例实操
Work Flow案例实操 目录 Work Flow案例实操 1. Yarm语法简介 2. HelloWorld案例 3. JavaProcess案例 4. 作业依赖案例 5. 自动失败重试案例 6. ...
- 航空专场 | 无人机设计仿真流程讲解与案例实操
一.CFD在无人机上的应用 1.静.动气动系数计算以上介绍的无人机的流动状态一般为中低雷诺数,不可压缩流动.这些计算一般用S-A模型或者KW-SST模型进行计算,能够获得不错的工程精度.静.动气动力系 ...
- 大数据培训课程数据清洗案例实操-简单解析版
数据清洗(ETL) 在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据.清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序.大数据培训 数据 ...
最新文章
- 开源 | IBM、哈佛共同研发:Seq2Seq模型可视化工具
- 双目视觉惯性里程计的在线初始化与自标定算法
- 《JavaScript DOM 编程艺术》 读书笔记
- php实现tail,Linux-如何用php实现Linux下的tail -f命令?
- 孔板流量计计算公式_带你全面了解各种流量计
- html调用天气预报wsdl服务,webservice接口调用天气预报例程
- 浅谈JavaScript继承
- 每个Xcode开发者应该知道的七个使用技巧
- MyBatis 3 – Spring集成教程
- sqlserver2008驱动_Python连接数据库两种方法,QSqlDatabase,pymmsql,驱动名
- python 日历查询系统_python 日历
- 12款很棒的浏览器兼容性测试工具推荐
- 武汉大学计算机学院 招聘院长,黄传河任武汉大学计算机学院执行副院长 主持工作...
- vue.js 事件的案例以及 v-model 的学习
- sqlalchemy mysql配置中怎么设置utf8_python – 使用SQLAlchemy和pymysql,如何设置连接以使用utf8mb4?...
- 那些你需要知道的互联网广告投放知识
- “积微者速成”与敏捷实践
- 灵活组合复合图元模板,电路图状态图业务定义可配置,浮云E绘图软件源码开发
- 剖析Unreal Engine超真实人类的渲染技术Part 2 - 眼球渲染
- 获取抖音视频信息api
热门文章
- 2B码农想转做产品,这的规划是否靠谱?
- 在CentOS4上安装JMagick
- 面试:输出循环小数的循环节
- [转]AAuto编程语言官方站 网站服务条款
- IOS之Autorotation and Autosizing
- 【深度学习】深度学习分类与模型评估
- Monto Carlo估计动作价值(action values)
- 自定义文件系统下的磁盘访问次数计算
- tf.nn.bidirectional_dynamic_rnn()函数详解
- [转]STL(容器)与DEBUGNEW运算符冲突的解决