大数据之电商分析系统(一)

一:项目介绍

​ 本项目来源于企业级电商网站的大数据统计分析平台, 该平台以 Spark 框架为核心, 对电商网站的日志进行离线和实时分析。该大数据分析平台对电商网站的各种用户行为( 访问行为、购物行为、广告点击行为等)进行分析,根据平台统计出来的数据, 辅助公司中的 PM(产品经理)、数据分析师以及管理人员分析现有产品的情况, 并根据用户行为分析结果持续改进产品的设计,以及调整公司的战略和业务。最终达到用大数据技术来帮助提升公司的业绩、营业额以及市场占有率的目标。

​ 本项目使用了 Spark 技术生态栈中最常用的三个技术框架, Spark Core、SparkSQL 和 Spark Streaming,进行离线计算和实时计算业务模块的开发。实现了包括用户访问 session 分析、页面单跳转化率统计、热门商品离线统计、广告流量实时统计4 个业务模块。通过合理的将实际业务模块进行技术整合与改造, 该项目几乎完全涵盖了 Spark Core、Spark SQL 和 Spark Streaming 这三个技术框架中大部分的功能点、知识点,学员对于 Spark 技术框架的理解将会在本项目中得到很大的提高。

二:项目框架

​ 1. 项目整体框架


​ 本项目分为离线分析系统与实时分析系统两大模块。

​ 在离线分析系统中,我们将模拟业务数据写入 Hive 表中,离线分析系统从Hive 中获取数据,并根据实际需求(用户访问 Session 分析、页面单跳转化率分析、各区域热门商品统计) 对数据进行处理,最终将分析完毕的统计数据存储到MySQL 的对应表格中。
​ 在实时分析系统中,我们将模拟业务数据写入 Kafka 集群中, 实时分析系统从 Kafka broker 中获取数据, 通过 Spark Streaming 的流式处理对广告点击流量进行实时分析,最终将统计结果存储到 MySQL 的对应表格中。

  1. 离线日志采集流程

  2. 实时日志采集流程

  1. 离线/实时日志采集框架

三:数据分析

  1. 离线数据分析

    用户访问行为表(user_visit_action) —>存放网站或者 APP每天的点击流数据,就是用户对网站/APP 每点击一下, 就会产生一条存放在这个表里面的数据。

    用户基本信息表(user_info) ----->是一张普通的用户基本信息表;这张表中存放了网站/APP 所有注册用户的基本信息.

    商品信息表(product_info) ----->是一张普通的商品基本信息表; 这张表中存放了网站/APP所有商品的基本信息。

  2. 在线数据分析

    程序每 5 秒向 Kafka 集群写入数据

    格式 : timestamp province city userid adid

四、项目需求分析

  1. 用户访问session统计

    ​ 用户在电商网站上, 通常会有很多的访问行为,通常都是进入首页, 然后可能点击首页上的一些商品,点击首页上的一些品类,也可能随时在搜索框里面搜索关键词,还可能将一些商品加入购物车,对购物车中的多个商品下订单,最后对订单中的多个商品进行支付。用户的每一次操作,其实可以理解为一个 action,在本项目中,我们关注点击、搜索、 下单、 支付这四个用户行为。
    ​ 用户 session, 是在电商平台的角度定义的会话概念, 指的就是, 从用户第一次进入首页, session 就开始了。 然后在一定时间范围内, 直到最后操作完( 可能做了几十次、甚至上百次操作),离开网站, 关闭浏览器,或者长时间没有做操作, 那么 session 就结束了。以上用户在网站内的访问过程, 就称之为一次 session。 简单理解, session就是某一天某一个时间段内, 某个用户对网站从打开/进入, 到做了大量操作,到最后关闭浏览器。的过程,就叫做 session。session 实际上就是一个电商网站中最基本的数据和大数据。那么面向消费者/用户端的大数据分析( C 端), 最基本的就是面向用户访问行为/用户访问 session 的分析。该模块主要是对用户访问 session 进行统计分析,包括 session 聚合指标计算、 按时间比例随机抽取 session、 获取每天点击、 下单和购买排名前 10 的品类、 并获取 top10 品类中排名前 10的 session。该模块可以让产品经理、数据分析师以及企业管理层形象地看到各种条件下的具体用户行为以及统计指标,从而对公司的产品设计以及业务发展战略做出调整。主要使用 Spark Core 实现。

    1. 页面单跳转换率统计

    ​ 该模块主要是计算关键页面之间的单步跳转转化率,涉及到页面切片算法以及页面流匹配算法。该模块可以让产品经理、数据分析师以及企业管理层看到各个关键页面之间的转化率,从而对网页布局,进行更好的优化设计。主要使用SparkCore实现。

  2. 区域热门统计

    ​ 该模块主要实现每天统计出各个区域的 top3 热门商品。 该模块可以让企业管理层看到电商平台在不同区域出售的商品的整体情况, 从而对公司的商品相关的战略进行调整。主要使用 Spark SQL 实现。

  3. 广告流量实时统计

    ​ 网站 / app 中经常会给第三方平台做广告,这也是一些互联网公司的核心收入来源;当广告位招商完成后,广告会在网站 / app 的某个广告位发布出去,当用户访问网站 / app 的时候, 会看到相应位置的广告, 此时, 有些用户可能就会去点击那个广告。我们要获取用户点击广告的行为,并针对这一行为进行计算和统计。
    ​ 用户每次点击一个广告以后,会产生相应的埋点日志;在大数据实时统计系统中,会通过某些方式将数据写入到分布式消息队列中( Kafka)。日志发送给后台 web 服务器( nginx), nginx 将日志数据负载均衡到多个Tomcat 服务器上, Tomcat 服务器会不断将日志数据写入 Tomcat 日志文件中,写入后,就会被日志采集客户端(比如 flume agent)所采集,随后写入到消息队列中( kafka),我们的实时计算程序会从消息队列中( kafka)去实时地拉取数据,然后对数据进行实时的计算和统计。
    ​ 这个模块的意义在于, 让产品经理、高管可以实时地掌握到公司打的各种广告的投放效果。以便于后期持续地对公司的广告投放相关的战略和策略,进行调整和优化;以期望获得最好的广告收益。该模块负责实时统计公司的广告流量, 包括广告展现流量和广告点击流量。 实现动态黑名单机制, 以及黑名单过滤; 实现滑动窗口内的各城市的广告展现流量和广告点击流量的统计; 实现每个区域每个广告的点击流量实时统计;实现每个区域 top3 点击量的广告的统计。主要使用 Spark Streaming 实现。

五:项目需求具体实现

  1. Session各范围访问步长、访问时长占比统计

    ​ 需求一要统计出符合筛选条件的 session 中,访问时长在 1s3s、4s6s、7s9s、10s30s、30s60s、1m3m、3m10m、10m30m、30m 以上各个范围内的 session占比;访问步长在 13、46、79、1030、30~60、60 以上各个范围内的 session占比,并将结果保存到 MySQL 数据库中。

    ​ 在计算之前需要根据查询条件筛选 session,查询条件比如搜索过某些关键词的用户、访问时间在某个时间段内的用户、年龄在某个范围内的用户、职业在某个范围内的用户、所在某个城市的用户,发起的 session。找到对应的这些用户的 session,并进行统计, 之所以需要有筛选主要是可以让使用者, 对感兴趣的和关系的用户群体,进行后续各种复杂业务逻辑的统计和分析,那么拿到的结果数据,就是只是针对特殊用户群体的分析结果;而不是对所有用户进行分析的泛泛的分析结果。比如说,现在某个企业高层,就是想看到用户群体中, 28~35 岁的,老师职业的群体, 对应的一些统计和分析的结果数据,从而辅助高管进行公司战略上的决策制定。

    ​ session 访问时长,也就是说一个 session 对应的开始的 action,到结束的 action,之间的时间范围;还有,就是访问步长,指的是,一个 session 执行期间内,依次点击过多少个页面,比如说,一次 session,维持了 1 分钟, 那么访问时长就是 1m,然后在这 1 分钟内,点击了 10 个页面, 那么 session 的访问步长,就是 10.

    ​ 比如说,符合第一步筛选出来的 session 的数量大概是有 1000 万个。那么里面,我们要计算出,访问时长在 1s~3s 内的 session 的数量,并除以符合条件的总 session数量( 比如 1000 万),比如是 100 万/1000 万,那么 1s~3s 内的 session 占比就是 10%。依次类推,这里说的统计,就是这个意思。

    ​ 这个功能可以让人从全局的角度看到,符合某些条件的用户群体,使用我们的产品的一些习惯。比如大多数人,到底是会在产品中停留多长时间, 大多数人,会在一次使用产品的过程中,访问多少个页面。那么对于使用者来说, 有一个全局和清晰的认识。

  2. Session随机抽取

    ​ 在符合条件的 session 中,按照时间比例随机抽取 1000 个 session这个按照时间比例是什么意思呢?随机抽取本身是很简单的,但是按照时间比例,就很复杂了。比如说,这一天总共有 1000 万的 session。那么我现在总共要从这 1000 万 session 中,随机抽取出来 1000 个 session。但是这个随机不是那么简单的。需要做到如下几点要求:首先,如果这一天的 12:00~13:00 的 session 数量是 100万,那么这个小时的 session 占比就是 1/10,那么这个小时中的 100 万的 session,我们就要抽取 1/10 * 1000 = 100 个。然后再从这个小时的 100 万 session 中,随机抽取出 100 个 session。以此类推,其他小时的抽取也是这样做。

    ​ 这个功能的作用,是说,可以让使用者,能够对于符合条件的 session,按照时间比例均匀的随机采样出 1000 个 session,然后观察每个 session 具体的点击流/行为, 比如先进入了首页、然后点击了食品品类、然后点击了雨润火腿肠商品、然后搜索了火腿肠罐头的关键词、接着对王中王火腿肠下了订单、最后对订单做了支付。

    之所以要做到按时间比例随机采用抽取,就是要做到,观察样本的公平性。

    抽取完毕之后,需要将 Session 的相关信息和详细信息保存到 MySQL 数据库中。

    数据源解析:

    Session聚合数据 :AggrInfo 和 Session用户访问数据 UserVisitAction

  3. Top10热门品类

    在符合条件的 session 中,获取点击、下单和支付数量排名前 10 的品类。

    ​ 数据中的每个 session 可能都会对一些品类的商品进行点击、下单和支付等等行为, 那么现在就需要获取这些 session 点击、下单和支付数量排名前 10 的最热门的品类。也就是说,要计算出所有这些 session 对各个品类的点击、下单和支付的次数, 然后按照这三个属性进行排序,获取前 10 个品类。

    ​ 这个功能,很重要,就可以让我们明白, 就是符合条件的用户, 他最感兴趣的商品是什么种类。这个可以让公司里的人, 清晰地了解到不同层次、不同类型的用户的心理和喜好。计算完成之后, 将数据保存到 MySQL 数据库中。

    数据源解析:

    用户访问数据表: UserVisitAction

  4. Top10热门品类Top10活跃Session****统计

    对于排名前 10 的品类,分别获取其点击次数排名前 10 的 session。

    ​ 这个就是说, 对于 top10 的品类, 每一个都要获取对它点击次数排名前 10 的session。

    这个功能,可以让我们看到,对某个用户群体最感兴趣的品类, 各个品类最感兴趣最典型的用户的 session 的行为。计算完成之后,将数据保存到 MySQL 数据库中。

    数据源解析:

    用户访问数据表: UserVisitAction

需求1-4的具体代码实现
UserVisitSessionAnalyze
package com.ityouxin.sessionimport java.util.{Date, UUID}import com.ityouxin.commons.conf.ConfigurationManager
import com.ityouxin.commons.constant.Constants
import com.ityouxin.commons.model.{UserInfo, UserVisitAction}
import com.ityouxin.commons.utils.{DateUtils, NumberUtils, ParamUtils, StringUtils, ValidUtils}
import com.ityouxin.session.DataModel.{SessionAggrState, SessionDetail, Top10Category, Top10Session}
import com.ityouxin.session.UserVisitSessionAnalyze.{getClickCategoryIdCountRDD, getOrderCategoryIdCountRDD, getPayCategoryIdCountRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Dataset, SaveMode, SparkSession}
import net.sf.json.JSONObject
import org.apache.spark.storage.StorageLevelimport scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.util.Randomobject UserVisitSessionAnalyze {def main(args: Array[String]): Unit = {//初始化scval conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("UserVisitSessionAnalyze")//初始化SparkSessionval spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()//获取scval sc: SparkContext = spark.sparkContext//根据配置工具类ConfigurationManager来获取config,获取任务配置val jsonStr = ConfigurationManager.config.getString("task.params.json")//将获取到的配置String转换成json格式,便于传递val taskParm: JSONObject = JSONObject.fromObject(jsonStr)//查询user_visit_action表中的数据(按照日期范围)val userVisitActionRDD: RDD[UserVisitAction] = getActionRDDByDateRange(spark,taskParm)//println(userVisitActionRDD.collect().mkString("/r/n"))//将用户行为信息转换为k-v元组val sessionidActionRDD: RDD[(String, UserVisitAction)] = userVisitActionRDD.map(uva => {(uva.session_id, uva)})//缓存数据sessionidActionRDD.persist(StorageLevel.MEMORY_ONLY)//聚合解析写个方法进行分组、计算等操作val sessionAggrInfoRDD: RDD[(String, String)] = aggregateBySession(spark,sessionidActionRDD)//println(sessionidActionRDD.collect().mkString("\r\n"))/*进行下一步操作,计算步长和时长,累加更新session访问数、session访问时长范围、session访问步长范围,需要用到累加器进行对数据得叠加*///创建一个累加器,进行计算每个excultor中的数据计算val sessionAggrStateAccumulator = new SessionAggrStateAccumulator//累加器需要注册,才能使用sc.register(sessionAggrStateAccumulator,"sessionAggrStateAccumulator")//根据查询条件过滤用户行为数据,并且同时累加时长范围和步长访问的数据val filteredSessionAggrInfoRDD: RDD[(String, String)] = filterSessionAndAggrState(sessionAggrInfoRDD:RDD[(String,String)],taskParm,sessionAggrStateAccumulator)//缓存filteredSessionAggrInfoRDD.persist(StorageLevel.MEMORY_ONLY)//打印过滤后的数据filteredSessionAggrInfoRDD.foreach(println)//统计各个时长范围和步长范围ide占比val taskUUID = UUID.randomUUID().toString//需求一:计算所有符合条件的数据各个范围的占比,保存到mysql数据库中sessionAggrStateAccumulator.valuecalculateAndPersistAggrState(spark,sessionAggrStateAccumulator.value,taskUUID)val sessionDetailRDD: RDD[(String, UserVisitAction)] = getSessionDetailRDD(sessionidActionRDD,filteredSessionAggrInfoRDD)sessionDetailRDD.persist(StorageLevel.MEMORY_ONLY)// println(sessionDetailRDD.collect().mkString("\r\n"))//需求二:随机均匀抽取样本sessionrandRomExtractSession(spark,taskUUID,filteredSessionAggrInfoRDD,sessionDetailRDD)//需求三:求Top的热门品类val top10Categories: Array[(CategorySortKey, String)] = getTop10Category(spark,taskUUID,sessionDetailRDD)println(top10Categories.mkString("\r\n"))//需求四:获取top10品类的活跃top10的sessiongetTop10CategoryToTop10Session(spark,taskUUID,top10Categories,sessionDetailRDD)//释放资源spark.close()}//需求四:统计top10中每个品类的前10的sessiondef getTop10CategoryToTop10Session(spark: SparkSession,taskUUID: String,top10Categories: Array[(CategorySortKey, String)],sessionDetailRDD: RDD[(String, UserVisitAction)]) = {//对top10的品类进行数据处理   将其转换为(caregoryid,categoryid)val top10CategoryIds: Array[(Long, Long)] = top10Categories.map {case (csk, line) =>//获取categoryidval categoryid = StringUtils.getFieldFromConcatString(line, "\\|", Constants.FIELD_CATEGORY_ID)(categoryid.toLong, categoryid.toLong)}top10CategoryIds//获取到top10的品类的RDDval top10CategoryIdRDD: RDD[(Long, Long)] = spark.sparkContext.makeRDD(top10CategoryIds)//将sessionDetail行为数据,按照sessionid进行聚合val sessionActionsRDD: RDD[(String, Iterable[UserVisitAction])] = sessionDetailRDD.groupByKey()//统计每个品类categoeyid下的每个session的点击次数val categorySessionCountRDD: RDD[(Long, String)] = sessionActionsRDD.flatMap {case (sessionid, uvas) =>val categoryIdCountMap = new mutable.HashMap[Long, Long]()//遍历Iterable[UserVisitAction])  迭代器  中的数据,取出用户的行为数据for (uva <- uvas) {//判断容器中的点击品类id是否为空,是否存在点击行为if (uva.click_category_id != null && uva.click_category_id != -1 && !categoryIdCountMap.contains(uva.click_category_id))//不为空并且不存在行为数据,则将点击品类id值为0categoryIdCountMap.put(uva.click_category_id, 0)if (uva.click_category_id != null && uva.click_category_id != -1) {//不为空  存在则更新容器中的idcategoryIdCountMap.update(uva.click_category_id, categoryIdCountMap(uva.click_category_id) + 1)}}//对categoryCountMap数据进行格式转换//遍历map容器中的数据 拼接出需要的字符串 yield 注意 在for循环中使用/*yield的用法总结针对每一次 for 循环的迭代, yield 会产生一个值,被循环记录下来 (内部实现上,像是一个缓冲区).当循环结束后, 会返回所有 yield 的值组成的集合.返回集合的类型与被遍历的集合类型是一致的.*/for ((cid, count) <- categoryIdCountMap)yield (cid, sessionid + "," + count)}//统计所有的品类所有的session下的点击次数 categorySessionCountRDD//将两个RDD进行join操作 取导我们想要的RDD【(categoryid,sessionid+ “,” + count)】val top10CategoryJoinRDD: RDD[(Long, (Long, String))] = top10CategoryIdRDD.join(categorySessionCountRDD)//统计top10热门品类的每个品类的每个session的点击次数val top10CategorySessionCountRDD: RDD[(Long, String)] = top10CategoryJoinRDD.map {case (cid, (cid2, line)) =>(cid, line)}//根据品类id进行分组val top10CategorySessionsCountRDD: RDD[(Long, Iterable[String])] = top10CategorySessionCountRDD.groupByKey()//对分组后的数据进行压平,然后进行排序  拼接val top10SessionObjRDD: RDD[Top10Session] = top10CategorySessionsCountRDD.flatMap {case (cid, clicks) =>//处理sessionid +"," + count  这个字符串val top10Session = clicks.toList.sortWith(//转换为List后  可以使用sortwith进行排序  sortWith(排序函数)//_.split(“,”)(1) > _.split(",")(1)(x: String, y: String) => {x.split(",")(1) > y.split(",")(1)}).take(10)top10Session.map{case line =>//得到sessionid和countval sessionid = line.split(",")(0)val count = line.split(",")(1).toLong//封装样例类Top10Session(taskUUID,cid,sessionid,count)}}import spark.implicits._//将RDD 转换形成DFtop10SessionObjRDD.toDF().write.format("jdbc").option("url",ConfigurationManager.config.getString(Constants.JDBC_URL)).option("user",ConfigurationManager.config.getString(Constants.JDBC_USER)).option("password",ConfigurationManager.config.getString(Constants.JDBC_PASSWORD)).option("dbtable","top10_session").mode(SaveMode.Append).save()//将top10Session对应的用户行为详情写入到数据库//将join后的RDD转换格式val top10SessionRDD: RDD[(String, String)] = top10SessionObjRDD.map {item => (item.sessionid, item.sessionid)}//与用户session详情RDD与转换后的top10Session join操作val top10SessionDetailActionRDD: RDD[(String, (String, UserVisitAction))] = top10SessionRDD.join(sessionDetailRDD)val top10SessionDetail: RDD[SessionDetail] = top10SessionDetailActionRDD.map {case (sid, (sid2, uva)) =>SessionDetail(taskUUID, uva.user_id, uva.session_id, uva.page_id, uva.action_time,uva.search_keyword, uva.click_category_id, uva.click_product_id, uva.order_category_ids,uva.order_product_ids, uva.pay_category_ids, uva.pay_product_ids)}top10SessionDetail.toDF().write.format("jdbc").option("url",ConfigurationManager.config.getString(Constants.JDBC_URL)).option("user",ConfigurationManager.config.getString(Constants.JDBC_USER)).option("password",ConfigurationManager.config.getString(Constants.JDBC_PASSWORD)).option("dbtable","session_detail").mode(SaveMode.Append).save()}//计算点击类别的次数  需要进行筛选  将null和-1 去掉def getClickCategoryIdCountRDD(sessionDetailRDD: RDD[(String, UserVisitAction)]) = {val clickActionRDD: RDD[(String, UserVisitAction)] = sessionDetailRDD.filter {case (sessionid, uva) =>uva.click_category_id != null && uva.click_category_id != -1}val clickCategoryIdRDD: RDD[(Long, Long)] = clickActionRDD.map {case (sessionid, uva) =>(uva.click_category_id, 1L)}clickCategoryIdRDD.reduceByKey(_+_)}//计算每个品类的下单次数  也需要进行过滤掉空值def getOrderCategoryIdCountRDD(sessionDetailRDD: RDD[(String, UserVisitAction)]) = {val ordderAction: RDD[(String, UserVisitAction)] = sessionDetailRDD.filter {case (sessionid, uva) =>uva.order_category_ids != null}//对符合条件的下单进行扁平处理,组成一个新的RDDval orderCategoryIdRDD: RDD[(Long, Long)] = ordderAction.flatMap {case (sessionid, uva) =>uva.order_category_ids.split(",").map( item=>(item.toLong, 1L))}//+orderCategoryIdRDD.reduceByKey(_+_)}//支付次数,与上述类似def getPayCategoryIdCountRDD(sessionDetailRDD: RDD[(String, UserVisitAction)]) = {val payAction: RDD[(String, UserVisitAction)] = sessionDetailRDD.filter {case (sessionid, uva) =>uva.pay_category_ids != null}val payCategoryIdRDD: RDD[(Long, Long)] = payAction.flatMap {case (sessionid, uva) =>uva.pay_category_ids.split(",").map(item=>(item.toLong, 1L))}payCategoryIdRDD.reduceByKey(_+_)}//进行join处理,为排序做准备def joinCategoryAndDate(distinctCategoryIddRDD: RDD[(Long, Long)],clickCategoryIdCountRDD: RDD[(Long, Long)],orderCategoryIdCountRDD: RDD[(Long, Long)],payCategoryIdCountRDD: RDD[(Long, Long)]) = {//join操作后,拼接需要的RDDval clickJoinRDD: RDD[(Long, String)] = distinctCategoryIddRDD.leftOuterJoin(clickCategoryIdCountRDD).map {case (categroyid, (cid, optionValue)) =>val clickCount = if (optionValue.isDefined) optionValue.get else 0Lval value = Constants.FIELD_CATEGORY_ID + "=" + categroyid + "|" + Constants.FIELD_CLICK_COUNT + "=" + clickCount(categroyid, value)}//拼接出orderJoinRDDval orderJoinRDD: RDD[(Long, String)] = clickJoinRDD.leftOuterJoin(orderCategoryIdCountRDD).map {case (categroyid, (oldvalue, optionValue)) =>val orderCount = if (optionValue.isDefined) optionValue.get else 0Lval value = oldvalue+ "|" + Constants.FIELD_CATEGORY_ID + "=" + categroyid + "|" + Constants.FIELD_ORDER_COUNT + "=" + orderCount(categroyid, value)}//拼接支付Join后的RDDval payJoinRDD: RDD[(Long, String)] = orderJoinRDD.leftOuterJoin(payCategoryIdCountRDD).map {case (categroyid, (oldvalue, optionValue)) =>val payCount = if (optionValue.isDefined) optionValue.get else 0Lval value = oldvalue+ "|"+ Constants.FIELD_CATEGORY_ID + "=" + categroyid + "|" + Constants.FIELD_PAY_COUNT + "=" + payCount(categroyid, value)}payJoinRDD}//需求三:获取Top10的人们品类  点击量、下单量、支付量def getTop10Category(spark: SparkSession, taskUUID: String, sessionDetailRDD: RDD[(String, UserVisitAction)]) = {//获取所有产生点击 下单 支付行为的品类val categoryidRDD: RDD[(Long, Long)] = sessionDetailRDD.flatMap {case (sessionid, uva) =>val list = ArrayBuffer[(Long, Long)]()//点击行为的品类idif (uva.click_category_id != null && uva.click_category_id != -1) {list += ((uva.click_category_id, uva.click_category_id))}//下单行为的品类id  用户可能对应多个订单,ids  需要遍历if (uva.order_category_ids != null) {for (orderCid <- uva.order_category_ids.split(",")) {//将符合条件的id添加到容器list中list += ((orderCid.toLong, orderCid.toLong))}}//支付行为的品类idif (uva.pay_category_ids != null) {for (payCid <- uva.pay_category_ids.split(",")) {list += ((payCid.toLong, payCid.toLong))}}list}//一个用户可能多次访问一个类别,所以统计热门数量需要去重val distinctCategoryIddRDD = categoryidRDD.distinct()//计算每个品类的点击次数val clickCategoryIdCountRDD: RDD[(Long, Long)] = getClickCategoryIdCountRDD(sessionDetailRDD)//计算每个品类的下单次数val orderCategoryIdCountRDD: RDD[(Long, Long)] = getOrderCategoryIdCountRDD(sessionDetailRDD)//计算每个品类的支付次数val payCategoryIdCountRDD: RDD[(Long, Long)] = getPayCategoryIdCountRDD(sessionDetailRDD)//对计算后的点击次数 下单次数 支付次数 进行排序val joinCategoryRDD: RDD[(Long, String)] = joinCategoryAndDate(distinctCategoryIddRDD,clickCategoryIdCountRDD,orderCategoryIdCountRDD,payCategoryIdCountRDD)println(joinCategoryRDD.collect().mkString("\r\n"))//排序操作前准备val sortKeyCountRDD: RDD[(CategorySortKey, String)] = joinCategoryRDD.map {case (cid, line) =>//拼接出排序所需要的字段println(line)val clickCount = StringUtils.getFieldFromConcatString(line, "\\|", Constants.FIELD_CLICK_COUNT).toLongval orderCount = StringUtils.getFieldFromConcatString(line, "\\|", Constants.FIELD_ORDER_COUNT).toLongval payCount = StringUtils.getFieldFromConcatString(line, "\\|", Constants.FIELD_PAY_COUNT).toLong//封装排序样例类,进行处理数据(CategorySortKey(clickCount, orderCount, payCount), line)}//进行排序算法  降序val sortedKeyCountRDD: RDD[(CategorySortKey, String)] = sortKeyCountRDD.sortByKey(false)//前10行val top10Category: Array[(CategorySortKey, String)] = sortedKeyCountRDD.take(10)//排序后,再次拼接出符合保存mysql数据库的数据格式val top10CategoryArray: Array[Top10Category] = top10Category.map {case (categorySortKey, line) =>val categoryid = StringUtils.getFieldFromConcatString(line, "\\|", Constants.FIELD_CATEGORY_ID).toLongval clickCount = StringUtils.getFieldFromConcatString(line, "\\|", Constants.FIELD_CLICK_COUNT).toLongval orderCount = StringUtils.getFieldFromConcatString(line, "\\|", Constants.FIELD_ORDER_COUNT).toLongval payCount = StringUtils.getFieldFromConcatString(line, "\\|", Constants.FIELD_PAY_COUNT).toLong//样例类  进行数据拼接Top10Category(taskUUID, categoryid, clickCount, orderCount, payCount)}//转RDDval top10CategoryRDD = spark.sparkContext.parallelize(top10CategoryArray)import spark.implicits._//保存到数据库msyqltop10CategoryRDD.toDF().write.format("jdbc").option("url",ConfigurationManager.config.getString(Constants.JDBC_URL)).option("user",ConfigurationManager.config.getString(Constants.JDBC_USER)).option("password",ConfigurationManager.config.getString(Constants.JDBC_PASSWORD)).option("dbtable","top10_category").mode(SaveMode.Append).save()top10Category}//需求二:1  获取过滤后的用户行为详细数据def getSessionDetailRDD(sessionidActionRDD: RDD[(String, UserVisitAction)], filteredSessionAggrInfoRDD: RDD[(String, String)]) = {val tempRDD: RDD[(String, (String, UserVisitAction))] = filteredSessionAggrInfoRDD.join(sessionidActionRDD)tempRDD.map(item => (item._1,item._2._2))}//2. 随机均匀抽取样本def randRomExtractSession(spark: SparkSession,taskUUID: String,filteredSessionAggrInfoRDD: RDD[(String, String)],sessionDetailRDD: RDD[(String, UserVisitAction)]) = {//第一步  求每天每小时的sessionval timeSessionRDD: RDD[(String, String)] = filteredSessionAggrInfoRDD.map {case (sessionid, aggrInfo) =>val startTime = StringUtils.getFieldFromConcatString(aggrInfo, "\\|", Constants.FIELD_START_TIME)println(startTime)//将开始时间的格式转换为yyyy-MM-dd_HHval dateHour = DateUtils.getDateHour(startTime)(dateHour, aggrInfo)}//计算每天每个小时的session数量,通过调用countByKey方法来统计val countMap: collection.Map[String, Long] = timeSessionRDD.countByKey()//转换格式,将countMap转换为Map【date,Map【Hour,Count】】//创建一个hashMap用来转换数据val dateHourCountMap = mutable.HashMap[String,mutable.HashMap[String,Long]]()//遍历上面的ccountMap,取出天和小时for ((dateHour,count) <- countMap){//yyyy-MM-dd_HH  将key拆分,出来天和小时val date = dateHour.split("_")(0)val hour = dateHour.split("_")(1)//从hashMap中取出date天,然后进行模式匹配,判断天dateHourCountMap.get(date) match {case None =>//当天数为none时,n新建一个hashMapdateHourCountMap(date) = new mutable.HashMap[String,Long]()dateHourCountMap(date) += (hour->count)case Some(hourCountMap)=>hourCountMap+=(hour->count)}}println(dateHourCountMap.mkString("\r\n"))//求一天需要抽取的数量,所有天数中的总的session数量就时hashMap的大小val extractNumberPerDay = 100 / dateHourCountMap.size//存放,每天每小时需要抽取的样本的索引值列表  格式为【天,【小时,索引列表】】val dateHourExtractMap = mutable.HashMap[String,mutable.HashMap[String,ListBuffer[Int]]]()//创建一个随机数val random = new Random()//根据每个小时应该抽取的数量,来随机产生索引值def hourExtractMapFunc(hourExtractMap: mutable.HashMap[String, ListBuffer[Int]],hourCountMap: mutable.HashMap[String, Long],sessionCount: Long) = {for ((hour,count) <- hourCountMap){//计算每天每小时需要抽取的session数量var hourExtractNumber:Int = ((count/sessionCount.toDouble) * extractNumberPerDay).round.toInt//判断需要抽取的数量和count的大小,如果需要的大于实际的,则就按照实际取if (hourExtractNumber>count){hourExtractNumber = count.toInt}hourExtractMap.get(hour) match {case None =>hourExtractMap(hour) = new mutable.ListBuffer[Int]()for (i <- 0 until hourExtractNumber){var extractIndex = random.nextInt(count.toInt)//索引值重复的情况,一旦索引值发生重复现象,则需要重新获取indexwhile (hourExtractMap(hour).contains(extractIndex)){extractIndex = random.nextInt(count.toInt)}hourExtractMap(hour) += (extractIndex)}case Some(extractIndexList)=>for (i <- 0 until hourExtractNumber){var extractIndex = random.nextInt(count.toInt)while (hourExtractMap(hour).contains(extractIndex)){extractIndex = random.nextInt(count.toInt)}hourExtractMap(hour) += (extractIndex)}}}}for ((date,hourCountMap) <-dateHourCountMap){//计算出每天session的总数val sessionCount = hourCountMap.values.sum//依旧对索引值列表中的小时数进行匹配dateHourExtractMap.get(date) match {case None =>dateHourExtractMap(date) = new mutable.HashMap[String,mutable.ListBuffer[Int]]()//如果小时不存在,则进行更新到hashMap中hourExtractMapFunc(dateHourExtractMap(date),hourCountMap,sessionCount)case Some(hourExtractMap) =>hourExtractMapFunc(hourExtractMap,hourCountMap,sessionCount)}}}//计算时长、步长的各个范围的占比,并进行持久化def calculateAndPersistAggrState(spark: SparkSession, value: mutable.HashMap[String, Int], taskUUID: String) = {//获取到当前总的sessioncount,val session_count = value(Constants.SESSION_COUNT).toDoubleval visit_length_1s_3s = value.getOrElse(Constants.TIME_PERIOD_1s_3s,0).toDoubleval visit_length_4s_6s = value.getOrElse(Constants.TIME_PERIOD_4s_6s,0).toDoubleval visit_length_7s_9s = value.getOrElse(Constants.TIME_PERIOD_7s_9s,0).toDoubleval visit_length_10s_30s = value.getOrElse(Constants.TIME_PERIOD_10s_30s,0).toDoubleval visit_length_30s_60s = value.getOrElse(Constants.TIME_PERIOD_30s_60s,0).toDoubleval visit_length_1m_3m = value.getOrElse(Constants.TIME_PERIOD_1m_3m,0).toDoubleval visit_length_3m_10m = value.getOrElse(Constants.TIME_PERIOD_3m_10m,0).toDoubleval visit_length_10m_30m = value.getOrElse(Constants.TIME_PERIOD_10m_30m,0).toDoubleval visit_length_30m = value.getOrElse(Constants.TIME_PERIOD_30m,0).toDoubleval step_length_1_3 = value.getOrElse(Constants.STEP_PERIOD_1_3,0).toDoubleval step_length_4_6 = value.getOrElse(Constants.STEP_PERIOD_4_6,0).toDoubleval step_length_7_9 = value.getOrElse(Constants.STEP_PERIOD_7_9,0).toDoubleval step_length_10_30 = value.getOrElse(Constants.STEP_PERIOD_10_30,0).toDoubleval step_length_30_60 = value.getOrElse(Constants.STEP_PERIOD_30_60,0).toDoubleval step_length_60 = value.getOrElse(Constants.STEP_PERIOD_60,0).toDouble//时长各个范围占比val visit_length_1s_3s_ratio = NumberUtils.formatDouble(visit_length_1s_3s/session_count,2)val visit_length_4s_6s_ratio = NumberUtils.formatDouble(visit_length_4s_6s/session_count,2)val visit_length_7s_9s_ratio = NumberUtils.formatDouble(visit_length_7s_9s/session_count,2)val visit_length_10s_30s_ratio =  NumberUtils.formatDouble(visit_length_10s_30s/session_count,2)val visit_length_30s_60s_ratio =  NumberUtils.formatDouble(visit_length_30s_60s/session_count,2)val visit_length_1m_3m_ratio = NumberUtils.formatDouble(visit_length_1m_3m/session_count,2)val visit_length_3m_10m_ratio =NumberUtils.formatDouble(visit_length_3m_10m/session_count,2)val visit_length_10m_30m_ratio =NumberUtils.formatDouble(visit_length_10m_30m/session_count,2)val visit_length_30m_ratio = NumberUtils.formatDouble(visit_length_30m/session_count,2)步长各个范围占比val step_length_1_3_ratio = NumberUtils.formatDouble(step_length_1_3/session_count,2)val step_length_4_6_ratio = NumberUtils.formatDouble(step_length_4_6/session_count,2)val step_length_7_9_ratio = NumberUtils.formatDouble(step_length_7_9/session_count,2)val step_length_10_30_ratio = NumberUtils.formatDouble(step_length_10_30/session_count,2)val step_length_30_60_ratio = NumberUtils.formatDouble(step_length_30_60/session_count,2)val step_length_60_ratio = NumberUtils.formatDouble(step_length_60/session_count,2)//得到数据库中的字段val sessionAggrStat = SessionAggrState(taskUUID,session_count.toInt,visit_length_1s_3s_ratio,visit_length_4s_6s_ratio,visit_length_7s_9s_ratio,visit_length_10s_30s_ratio,visit_length_30s_60s_ratio,visit_length_1m_3m_ratio,visit_length_3m_10m_ratio,visit_length_10m_30m_ratio,visit_length_30m_ratio,step_length_1_3_ratio,step_length_4_6_ratio,step_length_7_9_ratio,step_length_10_30_ratio,step_length_30_60_ratio,step_length_60_ratio)import spark.implicits._//写入到mysql数据库中val sessionAggrStateRDD: RDD[SessionAggrState] = spark.sparkContext.makeRDD(Array(sessionAggrStat))//获取到配置jdbc的环境,连接数据库,保存数据sessionAggrStateRDD.toDF().write.format("jdbc").option("url",ConfigurationManager.config.getString(Constants.JDBC_URL)).option("user",ConfigurationManager.config.getString(Constants.JDBC_USER)).option("password",ConfigurationManager.config.getString(Constants.JDBC_PASSWORD)).option("dbtable","session_aggr_stat").mode(SaveMode.Append).save()}//过滤累加个范围的统计值,时长和步长def filterSessionAndAggrState(sessionAggrInfoRDD: RDD[(String,String)], taskParm: JSONObject, sessionAggrStateAccumulator: SessionAggrStateAccumulator) = {//获取任务的查询条件配置//年龄开始时间和截至时间val startAge: String = ParamUtils.getParam(taskParm,Constants.PARAM_START_AGE)println(startAge)val endAge: String = ParamUtils.getParam(taskParm,Constants.PARAM_END_AGE)//条件:职业、城市、性别、密码、类别val professional = ParamUtils.getParam(taskParm,Constants.PARAM_PROFESSIONALS)val cites: String = ParamUtils.getParam(taskParm,Constants.PARAM_CITIES)val sex = ParamUtils.getParam(taskParm,Constants.PARAM_SEX)val keywords = ParamUtils.getParam(taskParm,Constants.PARAM_KEYWORDS)val categoryIds = ParamUtils.getParam(taskParm,Constants.PARAM_CATEGORY_IDS)//判断获取到的条件、拼接字符串var _parmeter=((if (startAge !=null)Constants.PARAM_START_AGE + "=" + startAge + "|" else "") +(if (endAge != null)Constants.PARAM_END_AGE + "=" + endAge + "|" else "") +(if (professional != null) Constants.PARAM_PROFESSIONALS + "=" + professional + "|" else "") +(if (cites!=null)Constants.PARAM_CITIES + "=" + cites + "|" else "") +(if (sex!=null)Constants.PARAM_SEX + "=" + cites + "|" else "") +(if (keywords!=null)Constants.PARAM_KEYWORDS + "=" + cites + "|" else "") +(if (categoryIds!=null)Constants.PARAM_CATEGORY_IDS + "=" + cites + "|" else ""))//去掉末尾的|字符if (_parmeter.endsWith("|")){_parmeter = _parmeter.substring(0,_parmeter.length()-1)}//将字符串拼接好之后,执行过滤算子val filterSessionAggrInfoRDD: RDD[(String, String)] = sessionAggrInfoRDD.filter {case (sessionid, aggrInfo) =>//判断年龄、职位、性别、城市var success = trueif (!ValidUtils.between(aggrInfo, Constants.FIELD_AGE, _parmeter, Constants.PARAM_START_AGE, Constants.PARAM_END_AGE)) {success = false}if (!ValidUtils.in(aggrInfo, Constants.FIELD_PROFESSIONAL, _parmeter, Constants.PARAM_PROFESSIONALS)) {success = false}if (!ValidUtils.in(aggrInfo, Constants.FIELD_CITY, _parmeter, Constants.PARAM_CITIES)) {success = false}if (!ValidUtils.equal(aggrInfo, Constants.FIELD_SEX, _parmeter, Constants.PARAM_SEX)) {success = false}if (!ValidUtils.in(aggrInfo, Constants.FIELD_SEARCH_KEYWORDS, _parmeter, Constants.PARAM_KEYWORDS)) {success = false}if (!ValidUtils.in(aggrInfo, Constants.FIELD_CATEGORY_ID, _parmeter, Constants.PARAM_CATEGORY_IDS)) {success = false}//保留的数据,累加计算if (success) {sessionAggrStateAccumulator.add(Constants.SESSION_COUNT)//计算访问时长的范围def calculateVisitLength(visitLength: Long) = {if (visitLength >= 1 && visitLength <= 3) {sessionAggrStateAccumulator.add(Constants.TIME_PERIOD_1s_3s)} else if (visitLength >= 4 && visitLength <= 6) {sessionAggrStateAccumulator.add(Constants.TIME_PERIOD_4s_6s)} else if (visitLength >= 7 && visitLength <= 9) {sessionAggrStateAccumulator.add(Constants.TIME_PERIOD_7s_9s)} else if (visitLength >= 10 && visitLength <= 30) {sessionAggrStateAccumulator.add(Constants.TIME_PERIOD_10s_30s)} else if (visitLength > 30 && visitLength <= 60) {sessionAggrStateAccumulator.add(Constants.TIME_PERIOD_30s_60s)} else if (visitLength > 60 && visitLength <= 180) {sessionAggrStateAccumulator.add(Constants.TIME_PERIOD_1m_3m)} else if (visitLength > 180 && visitLength <= 600) {sessionAggrStateAccumulator.add(Constants.TIME_PERIOD_3m_10m)} else if (visitLength > 600 && visitLength <= 1800) {sessionAggrStateAccumulator.add(Constants.TIME_PERIOD_10m_30m)} else if (visitLength > 1800) {sessionAggrStateAccumulator.add(Constants.TIME_PERIOD_30m)}}//使用字符串工具类获取到当前的用户时长val visitLength: Long = StringUtils.getFieldFromConcatString(aggrInfo, "\\|", Constants.FIELD_VISIT_LENGTH).toLong//调用时长判断方法,判断其所在区间calculateVisitLength(visitLength)//计算用户的步长def calculateStepLength(stepLength: Long) = {//判断范围if (stepLength >= 1 && stepLength <= 3) {sessionAggrStateAccumulator.add(Constants.STEP_PERIOD_1_3)} else if (stepLength >= 4 && stepLength <= 6) {sessionAggrStateAccumulator.add(Constants.STEP_PERIOD_4_6)} else if (stepLength >= 7 && stepLength <= 9) {sessionAggrStateAccumulator.add(Constants.STEP_PERIOD_7_9)} else if (stepLength >= 10 && stepLength <= 30) {sessionAggrStateAccumulator.add(Constants.STEP_PERIOD_10_30)} else if (stepLength > 30 && stepLength <= 60) {sessionAggrStateAccumulator.add(Constants.STEP_PERIOD_30_60)} else if (stepLength > 60) {sessionAggrStateAccumulator.add(Constants.STEP_PERIOD_60)}}//获取当前用户步长val stepLength: String = StringUtils.getFieldFromConcatString(aggrInfo, "\\|", Constants.FIELD_STEP_LENGTH)//调用方法,判断范围calculateStepLength(stepLength.toLong)}success}filterSessionAggrInfoRDD}//查询数据,从user_visit_actiondef getActionRDDByDateRange(spark: SparkSession, taskParm: JSONObject) = {//获取开始时间val startDate: String = ParamUtils.getParam(taskParm,Constants.PARAM_START_DATE)//获取结束时间val endDate = ParamUtils.getParam(taskParm,Constants.PARAM_END_DATE)//导入spark的隐式转换import spark.implicits._//在表中查询,注意变量需要用单引号引入val SQLQuery = "select * from user_visit_action where date >= '"+startDate+"' and date <= '" +endDate+"'"//将查询出的数据转换成dataSet,再转换成RDDval ds: Dataset[UserVisitAction] = spark.sql(SQLQuery).as[UserVisitAction]ds.rdd}//聚合解析,分组、计算步长、时长等将其session行为数据聚合成partAggInfo(Sessionid | 查询词集合 |点击物品类别集合 | Session访问时长 | Session访问步长 | Session访问时间)def aggregateBySession(spark: SparkSession, sessionidActionRDD: RDD[(String, UserVisitAction)]) = {//对行为数据按照session粒度Sessionid 进行分组val sessionActionsRDD: RDD[(String, Iterable[UserVisitAction])] = sessionidActionRDD.groupByKey()//对每个session聚合操作val partAggInfoRDD = sessionActionsRDD.map {case (sessionid,userVisitActions)=>//定义buffer用来存储搜索word和行为的idval searchKeyWordsBuffer = new StringBuffer("")val clickCategoryIdsBuffer = new StringBuffer("")var userid = -1L//session的起始时间和结束时间var startTime:Date = nullvar endTime:Date = null//行为步长var stepLength = 0userVisitActions.foreach(uva=>{//判断用户名的空值if (userid == -1L){userid = uva.user_id}val searchKeyWord = uva.search_keywordval click_category_id:Long = uva.click_category_idif (StringUtils.isNotEmpty(searchKeyWord)){if (!searchKeyWordsBuffer.toString.contains(searchKeyWord)){clickCategoryIdsBuffer.append(click_category_id + ",")}}if (click_category_id !=null && click_category_id != -1L){if (!clickCategoryIdsBuffer.toString.contains(click_category_id.toString)){clickCategoryIdsBuffer.append(click_category_id + ",")}}//获取行为时间val actionTime:Date = DateUtils.parseTime(uva.action_time)//获取startTimeif (startTime == null || actionTime.before(startTime)){startTime=actionTimeprintln(startTime)}//获取结束时间if (endTime == null || actionTime.before(endTime)){endTime=actionTime}//行为步长stepLength += 1})//end userVisitActions froeachval searchKeyWords:String=StringUtils.trimComma(searchKeyWordsBuffer.toString)val clickCategoryIds:String = StringUtils.trimComma(clickCategoryIdsBuffer.toString)//计算时长(秒)val visitLength:Long=(endTime.getTime - startTime.getTime)/1000// 拼装字符串  k=v | k=vval partAggrInfo:String= Constants.FIELD_SESSION_ID + "=" + sessionid + "|" +Constants.FIELD_SEARCH_KEYWORDS + "=" + searchKeyWords + "|" +Constants.FIELD_CLICK_CATEGORY_IDS + "=" + clickCategoryIds + "|" +Constants.FIELD_VISIT_LENGTH + "=" + visitLength + "|" +Constants.FIELD_STEP_LENGTH + "=" + stepLength + "|" +Constants.FIELD_START_TIME + "=" + DateUtils.formatTime(startTime)(userid,partAggrInfo)}//println(partAggInfoRDD.collect().mkString("\r\n"))//TODO 和userInfo join操作//查询用户信息表,然后和用户行为表做join操作import spark.implicits._//从表中查询出数据,然后将其转换成dsval ds: Dataset[UserInfo] = spark.sql("select * from user_info").as[UserInfo]val userInfoRDD: RDD[UserInfo] = ds.rdd//将userInfoRDD转换为元组(userid,user)val useridUserInfoRDD: RDD[(Long, UserInfo)] = userInfoRDD.map(user => (user.user_id,user))//useridUserInfo   join partAggrInfoRDDval userIdFullInfoRDD = partAggInfoRDD.join(useridUserInfoRDD)//按照用户id join 在一起得数据,对这些数据进行聚合成一个RDD【String,String】val sessionFullAggrInfoRDD: RDD[(String, String)] = userIdFullInfoRDD.map {//使用模式匹配,对数据进行拼接成需要得类型userId,(partAggrInfo,userInfo)case (userId, (partAggrInfo, userInfo)) => {//使用StringUtils工具类来获取到partAggrInfo中的sessionidval sessionid: String = StringUtils.getFieldFromConcatString(partAggrInfo, "\\|", Constants.FIELD_SESSION_ID)//拼接业务流程中需要得字符串val fullAggrInfo: String = partAggrInfo + "|" +Constants.FIELD_AGE + "=" + userInfo.age + "|" +Constants.FIELD_PROFESSIONAL + "=" + userInfo.professional + "|" +Constants.FIELD_CITY + "=" + userInfo.city + "|" +Constants.FIELD_SEX + "=" + userInfo.sex(sessionid, fullAggrInfo)}}sessionFullAggrInfoRDD}
}
DataModel数据模型样例类
package com.ityouxin.sessionobject DataModel {/*** Session随机抽取表** @param taskid             当前计算批次的ID* @param sessionid          抽取的Session的ID* @param startTime          Session的开始时间* @param searchKeywords     Session的查询字段* @param clickCategoryIds   Session点击的类别id集合*/case class SessionRandomExtract(taskid:String,sessionid:String,startTime:String,searchKeywords:String,clickCategoryIds:String)/*** Session随机抽取详细表** @param taskid            当前计算批次的ID* @param userid            用户的ID* @param sessionid         Session的ID* @param pageid            某个页面的ID* @param actionTime        点击行为的时间点* @param searchKeyword     用户搜索的关键词* @param clickCategoryId   某一个商品品类的ID* @param clickProductId    某一个商品的ID* @param orderCategoryIds  一次订单中所有品类的ID集合* @param orderProductIds   一次订单中所有商品的ID集合* @param payCategoryIds    一次支付中所有品类的ID集合* @param payProductIds     一次支付中所有商品的ID集合**/case class SessionDetail(taskid:String,userid:Long,sessionid:String,pageid:Long,actionTime:String,searchKeyword:String,clickCategoryId:Long,clickProductId:Long,orderCategoryIds:String,orderProductIds:String,payCategoryIds:String,payProductIds:String)/*** 品类Top10表* @param taskid* @param categoryid* @param clickCount* @param orderCount* @param payCount*/case class Top10Category(taskid:String,categoryid:Long,clickCount:Long,orderCount:Long,payCount:Long)case class Top10Session(taskid:String,categoryid:Long,sessionid:String,count:Long)case class SessionAggrState(taskid:String,session_count:Long,visit_length_1s_3s_ratio:Double,visit_length_4s_6s_ratio:Double,visit_length_7s_9s_ratio:Double,visit_length_10s_30s_ratio:Double,visit_length_30s_60s_ratio:Double,visit_length_1m_3m_ratio:Double,visit_length_3m_10m_ratio:Double,visit_length_10m_30m_ratio:Double,visit_length_30m_ratio:Double,step_length_1_3_ratio:Double,step_length_4_6_ratio:Double,step_length_7_9_ratio:Double,step_length_10_30_ratio:Double,step_length_30_60_ratio:Double,step_length_60_ratio:Double)
}
SessionAffrStateAccumulator  累加器类
package com.ityouxin.sessionimport org.apache.spark.util.AccumulatorV2import scala.collection.mutableclass SessionAggrStateAccumulator extends AccumulatorV2[String,mutable.HashMap[String,Int]]{private val aggrStateMap = mutable.HashMap[String,Int]()override def isZero: Boolean = {aggrStateMap.isEmpty}override def copy(): AccumulatorV2[String, mutable.HashMap[String, Int]] = {val newAcc = new SessionAggrStateAccumulatoraggrStateMap.synchronized{newAcc.aggrStateMap ++= this.aggrStateMap}newAcc}override def reset(): Unit = {aggrStateMap.clear()}
//分区内的累加override def add(v: String): Unit = {if (!aggrStateMap.contains(v)){aggrStateMap += (v -> 1)}else{aggrStateMap.update(v,aggrStateMap(v) + 1)}}override def merge(other: AccumulatorV2[String, mutable.HashMap[String, Int]]): Unit = {//将两个map类型的数据进行合并other match {case acc:SessionAggrStateAccumulator =>{(this.aggrStateMap /: acc.aggrStateMap){case (map,(k,v))=>{map +=(k->(v + map.getOrElse(k,0)))}}}}}override def value: mutable.HashMap[String, Int] = {this.aggrStateMap}
}
PageSplitCovertRate  页面转换率样例类
package com.ityouxin.sessioncase class PageSplitConvertRate(taskid:String ,convertRate: String)
val newAcc = new SessionAggrStateAccumulator
aggrStateMap.synchronized{newAcc.aggrStateMap ++= this.aggrStateMap
}
newAcc

}

override def reset(): Unit = {
aggrStateMap.clear()
}
//分区内的累加
override def add(v: String): Unit = {
if (!aggrStateMap.contains(v)){
aggrStateMap += (v -> 1)
}else{
aggrStateMap.update(v,aggrStateMap(v) + 1)
}
}
override def merge(other: AccumulatorV2[String, mutable.HashMap[String, Int]]): Unit = {
//将两个map类型的数据进行合并
other match {
case acc:SessionAggrStateAccumulator =>{
(this.aggrStateMap /: acc.aggrStateMap){
case (map,(k,v))=>{
map +=(k->(v + map.getOrElse(k,0)))
}
}
}
}
}

override def value: mutable.HashMap[String, Int] = {
this.aggrStateMap
}
}


```scala
PageSplitCovertRate  页面转换率样例类
package com.ityouxin.sessioncase class PageSplitConvertRate(taskid:String ,convertRate: String)

大数据之电商分析系统(一)相关推荐

  1. 视频教程-全新大数据企业电商数据仓库项目实战教程-大数据

    全新大数据企业电商数据仓库项目实战教程 张长志技术全才.擅长领域:区块链.大数据.Java等.10余年软件研发及企业培训经验,曾为多家大型企业提供企业内训如中石化,中国联通,中国移动等知名企业.拥有丰 ...

  2. 金融科技大数据产品推荐:蓝金灵—基于大数据的电商企业供应链金融服务平台

    官网 | www.datayuan.cn 微信公众号ID | datayuancn 本产品为数据猿推出的"金融科技价值-数据驱动金融商业裂变"大型主题策划活动第一部分的文章/案例/ ...

  3. 大数据之电商推荐系统

    #大数据之电商推荐系统# 项目系统架构 数据整理 商品数据 商品ID 商品名称 商品种类 商品图片URL 商品标签 productId name categories imageUrl tags 评分 ...

  4. 大数据平台网站日志分析系统

    1:大数据平台网站日志分析系统,项目技术架构图: 2:大数据平台网站日志分析系统,流程图解析,整体流程如下: ETL即hive查询的sql; 但是,由于本案例的前提是处理海量数据,因而,流程中各环节所 ...

  5. 大数据平台日志存储分析系统解决方案

    大数据平台日志存储分析系统是在大数据平台下,针对业务系统产生的日志记录进行存储和分析.日志数据来自ElasticSearch存储的日志历史数据,并将需要存储和分析的日志数据从ElasticSearch ...

  6. 基于大数据的网站日志分析系统

    本文没有任何代码,只有各个模块工作的大体机制和整体流程.算是一个科普文吧,我也对原理一知半解. 基于大数据的网站日志分析系统 1. 日志数据格式 1.1 访问日志 1.1.1 log_format 1 ...

  7. 大数据之电商系统基本概念

    电商系统基本概念 在做电商系统的数据分析时,经常会遇到了两个概念:SKU,SPU SKU(Stock Keeping Unit) SKU全称为库存量基本单元,现在已经被引申为产品统一编号的简称,每种产 ...

  8. 基于用户画像大数据的电商防刷架构

    http://blog.csdn.net/tengxy_cloud/article/details/52576675 一.背景介绍 最近1~2年电商行业飞速发展,各种创业公司犹如雨后春笋大量涌现,商家 ...

  9. 大数据项目--电商业务数据仓库

    文章目录 一.电商业务与数据结构简介 1.1 电商业务流程图 1.2 数据表结构 1.2.1 电商常识 (SKU,SPU) 1.2.2 订单表(order_info) 订单详情表 (order_det ...

最新文章

  1. 白盒测试之语句分支条件覆盖
  2. 韩顺平php程序员算法
  3. 【Python学习系列二十七】pearson相关系数计算
  4. php垂直居中代码_div标签:水平居中和垂直居中的实现(附代码)
  5. obs virtual camera
  6. python中if的效率_Python 代码性能优化技巧
  7. 32要烧写3个bin文件_入门教程3:如何给ESP8266烧录Gagent固件,快速接入机智云实现透传功能...
  8. 【竞赛题解】第22次CCF计算机软件能力认证 B
  9. html百度天气api,百度API 免费接口获取天气预报
  10. WEB下的excel批量导入功能
  11. iNet Network Scanner Mac(网络扫描工具)
  12. SQLServer -ServiceBroker
  13. Photoshop-选区的应用
  14. 用java在画布上画心形线_Java画心形线
  15. uniapp 开发小程序使用iconfont彩色图标
  16. TP6框架全新开发社区系统源码开源
  17. 搭建自己的frp服务器
  18. C语言中怎么表示派 -π
  19. 全球及中国半导体行业发展方向及项目投资建设分析报告2022-2028年版
  20. SUMO的停车场仿真

热门文章

  1. Emacs、cedet、ecb
  2. 独立正交不相关定义关系
  3. 如何书写游戏设计文档
  4. 微信小程序获取手机号的乱码问题
  5. 获取服务器微信头像更改不同规格图片大小
  6. 计算机二级Python真题(六)
  7. python 调用golang_python调用golang编写的动态链接库
  8. 百度对“十进制网络”的官方态度
  9. 如何发送和接收RTP封包的H264,用FFmpeg解码
  10. html查询一张表,HTML5数据查询,表格显示功能代码教程