一、Session随机抽取





1.1 数据转换

接着笔记1中的代码继续写:

def sessionRandomExtract(sparkSession: SparkSession,taskUUID: String,sessionId2FilterRDD: RDD[(String, String)]): Unit = {// dateHour2FullInfoRDD: RDD[(dateHour, fullInfo)]val dateHour2FullInfoRDD = sessionId2FilterRDD.map{case (sid, fullInfo) =>val startTime = StringUtils.getFieldFromConcatString(fullInfo, "\\|", Constants.FIELD_START_TIME)// dateHour: yyyy-MM-dd_HHval dateHour = DateUtils.getDateHour(startTime)(dateHour, fullInfo)}// hourCountMap: Map[(dateHour, count)]val hourCountMap = dateHour2FullInfoRDD.countByKey()// dateHourCountMap: Map[(date, Map[(hour, count)])]val dateHourCountMap = new mutable.HashMap[String, mutable.HashMap[String, Long]]()for((dateHour, count) <- hourCountMap){val date = dateHour.split("_")(0)val hour = dateHour.split("_")(1)dateHourCountMap.get(date) match{case None => dateHourCountMap(date) = new mutable.HashMap[String, Long]()dateHourCountMap(date) += (hour->count)case Some(map) => dateHourCountMap(date) += (hour->count)}}// 解决问题一: 一共有多少天: dateHourCountMap.size//              一天抽取多少条:100 / dateHourCountMap.sizeval extractPerDay = 100 / dateHourCountMap.size// 解决问题二: 一天有多少session:dateHourCountMap(date).values.sum// 解决问题三: 一个小时有多少session:dateHourCountMap(date)(hour)

上面代码中的转换:

1.2随机索引生成

/**
* extractPerDay 每天要抽取的Session数量
* daySessionCount 一天的Session总数
* hourCountMap key:某小时  value:某小时中的Session个数
* hourListMap 返回某个小时中抽取的随机索引列表
*/def generateRandomIndexList(extractPerDay:Long,daySessionCount:Long,hourCountMap:mutable.HashMap[String, Long],hourListMap:mutable.HashMap[String, ListBuffer[Int]]): Unit ={for((hour, count) <- hourCountMap){// 获取一个小时要抽取多少条数据var hourExrCount = ((count / daySessionCount.toDouble) * extractPerDay).toInt// 避免一个小时要抽取的数量超过这个小时的总数if(hourExrCount > count){hourExrCount = count.toInt}val random = new Random()hourListMap.get(hour) match{case None => hourListMap(hour) = new ListBuffer[Int]for(i <- 0 until hourExrCount){var index = random.nextInt(count.toInt)while(hourListMap(hour).contains(index)){index = random.nextInt(count.toInt)}hourListMap(hour).append(index)}case Some(list) =>for(i <- 0 until hourExrCount){var index = random.nextInt(count.toInt)while(hourListMap(hour).contains(index)){index = random.nextInt(count.toInt)}hourListMap(hour).append(index)}}}}

接着在sessionRandomExtract方法中继续写:

    // 解决问题一: 一共有多少天: dateHourCountMap.size//              一天抽取多少条:100 / dateHourCountMap.sizeval extractPerDay = 100 / dateHourCountMap.size// 解决问题二: 一天有多少session:dateHourCountMap(date).values.sum// 解决问题三: 一个小时有多少session:dateHourCountMap(date)(hour)val dateHourExtractIndexListMap = new mutable.HashMap[String, mutable.HashMap[String, ListBuffer[Int]]]()// dateHourCountMap: Map[(date, Map[(hour, count)])]for((date, hourCountMap) <- dateHourCountMap){val dateSessionCount = hourCountMap.values.sumdateHourExtractIndexListMap.get(date) match{case None => dateHourExtractIndexListMap(date) = new mutable.HashMap[String, ListBuffer[Int]]()generateRandomIndexList(extractPerDay, dateSessionCount, hourCountMap,  dateHourExtractIndexListMap(date))case Some(map) =>generateRandomIndexList(extractPerDay, dateSessionCount, hourCountMap,  dateHourExtractIndexListMap(date))}// 到目前为止,我们获得了每个小时要抽取的session的index// 广播大变量,提升任务性能val dateHourExtractIndexListMapBd = sparkSession.sparkContext.broadcast(dateHourExtractIndexListMap)}

1.3抽取Session并写入数据库

// 到目前为止,我们获得了每个小时要抽取的session的index// 广播大变量,提升任务性能val dateHourExtractIndexListMapBd = sparkSession.sparkContext.broadcast(dateHourExtractIndexListMap)// dateHour2FullInfoRDD: RDD[(dateHour, fullInfo)]// dateHour2GroupRDD: RDD[(dateHour, iterableFullInfo)]val dateHour2GroupRDD = dateHour2FullInfoRDD.groupByKey()// extractSessionRDD: RDD[SessionRandomExtract]val extractSessionRDD = dateHour2GroupRDD.flatMap{case (dateHour, iterableFullInfo) =>val date = dateHour.split("_")(0)val hour = dateHour.split("_")(1)val extractList = dateHourExtractIndexListMapBd.value.get(date).get(hour)val extractSessionArrayBuffer = new ArrayBuffer[SessionRandomExtract]()var index = 0for(fullInfo <- iterableFullInfo){if(extractList.contains(index)){val sessionId = StringUtils.getFieldFromConcatString(fullInfo, "\\|", Constants.FIELD_SESSION_ID)val startTime = StringUtils.getFieldFromConcatString(fullInfo, "\\|",Constants.FIELD_START_TIME)val searchKeywords = StringUtils.getFieldFromConcatString(fullInfo, "\\|", Constants.FIELD_SEARCH_KEYWORDS)val clickCategories = StringUtils.getFieldFromConcatString(fullInfo, "\\|", Constants.FIELD_CLICK_CATEGORY_IDS)val extractSession = SessionRandomExtract(taskUUID, sessionId, startTime, searchKeywords, clickCategories)extractSessionArrayBuffer += extractSession}index += 1}extractSessionArrayBuffer}import sparkSession.implicits._extractSessionRDD.toDF().write.format("jdbc").option("url", ConfigurationManager.config.getString(Constants.JDBC_URL)).option("user",ConfigurationManager.config.getString(Constants.JDBC_USER)).option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD)).option("dbtable", "session_extract_0308").mode(SaveMode.Append).save()}

二、Top10热门品类统计

2.1获取点击、下单、付款次数

  def getClickCount(sessionId2FilterActionRDD: RDD[(String, UserVisitAction)]) = {//    val clickFilterRDD = sessionId2FilterActionRDD.filter{//      case (sessionId, action) => action.click_category_id != -1L
//    }val clickFilterRDD = sessionId2FilterActionRDD.filter(item => item._2.click_category_id != -1L)val clickNumRDD = clickFilterRDD.map{case (sessionId, action) => (action.click_category_id, 1L)}clickNumRDD.reduceByKey(_+_)}def getOrderCount(sessionId2FilterActionRDD: RDD[(String, UserVisitAction)]) = {val orderFilterRDD = sessionId2FilterActionRDD.filter(item => item._2.order_category_ids != null)val orderNumRDD = orderFilterRDD.flatMap{// action.order_category_ids.split(","): Array[String]// action.order_category_ids.split(",").map(item => (item.toLong, 1L)// 先将我们的字符串拆分成字符串数组,然后使用map转化数组中的每个元素,// 原来我们的每一个元素都是一个string,现在转化为(long, 1L)case (sessionId, action) => action.order_category_ids.split(",").map(item => (item.toLong, 1L))}orderNumRDD.reduceByKey(_+_)}def getPayCount(sessionId2FilterActionRDD: RDD[(String, UserVisitAction)]) = {val payFilterRDD = sessionId2FilterActionRDD.filter(item => item._2.pay_category_ids != null)val payNumRDD = payFilterRDD.flatMap{case (sid, action) =>action.pay_category_ids.split(",").map(item => (item.toLong, 1L))}payNumRDD.reduceByKey(_+_)}def top10PopularCategories(sparkSession: SparkSession,taskUUID: String,sessionId2FilterActionRDD: RDD[(String, UserVisitAction)]): Unit = {// 第一步:获取所有发生过点击、下单、付款的品类var cid2CidRDD = sessionId2FilterActionRDD.flatMap{case (sid, action)=>val categoryBuffer = new ArrayBuffer[(Long, Long)]()// 点击行为if(action.click_category_id != -1){categoryBuffer += ((action.click_category_id, action.click_category_id))}else if(action.order_category_ids != null){for(orderCid <- action.order_category_ids.split(","))categoryBuffer += ((orderCid.toLong, orderCid.toLong))}else if(action.pay_category_ids != null){for(payCid <- action.pay_category_ids.split(","))categoryBuffer += ((payCid.toLong, payCid.toLong))}categoryBuffer}cid2CidRDD = cid2CidRDD.distinct()// 第二步:统计品类的点击次数、下单次数、付款次数val cid2ClickCountRDD = getClickCount(sessionId2FilterActionRDD)val cid2OrderCountRDD = getOrderCount(sessionId2FilterActionRDD)val cid2PayCountRDD = getPayCount(sessionId2FilterActionRDD)}

2.2统计top10

这里的这个排序,先根据点击次数排,再根据下单次数排,最后根据付款次数排

 def getFullCount(cid2CidRDD: RDD[(Long, Long)],cid2ClickCountRDD: RDD[(Long, Long)],cid2OrderCountRDD: RDD[(Long, Long)],cid2PayCountRDD: RDD[(Long, Long)]) = {val cid2ClickInfoRDD = cid2CidRDD.leftOuterJoin(cid2ClickCountRDD).map{case (cid, (categoryId, option)) =>val clickCount = if(option.isDefined) option.get else 0val aggrCount = Constants.FIELD_CATEGORY_ID + "=" + cid + "|" +Constants.FIELD_CLICK_COUNT + "=" + clickCount(cid, aggrCount)}val cid2OrderInfoRDD = cid2ClickInfoRDD.leftOuterJoin(cid2OrderCountRDD).map{case (cid, (clickInfo, option)) =>val orderCount = if(option.isDefined) option.get else 0val aggrInfo = clickInfo + "|" +Constants.FIELD_ORDER_COUNT + "=" + orderCount(cid, aggrInfo)}val cid2PayInfoRDD = cid2OrderInfoRDD.leftOuterJoin(cid2PayCountRDD).map{case (cid, (orderInfo, option)) =>val payCount = if(option.isDefined) option.get else 0val aggrInfo = orderInfo + "|" +Constants.FIELD_PAY_COUNT + "=" + payCount(cid, aggrInfo)}cid2PayInfoRDD}

top10PopularCategories方法中接着写:

  // 第二步:统计品类的点击次数、下单次数、付款次数val cid2ClickCountRDD = getClickCount(sessionId2FilterActionRDD)val cid2OrderCountRDD = getOrderCount(sessionId2FilterActionRDD)val cid2PayCountRDD = getPayCount(sessionId2FilterActionRDD)// cid2FullCountRDD: RDD[(cid, countInfo)]// (62,categoryid=62|clickCount=77|orderCount=65|payCount=67)val cid2FullCountRDD = getFullCount(cid2CidRDD, cid2ClickCountRDD, cid2OrderCountRDD, cid2PayCountRDD)// 实现自定义二次排序keyval sortKey2FullCountRDD = cid2FullCountRDD.map{case (cid, countInfo) =>val clickCount = StringUtils.getFieldFromConcatString(countInfo, "\\|", Constants.FIELD_CLICK_COUNT).toLongval orderCount = StringUtils.getFieldFromConcatString(countInfo, "\\|", Constants.FIELD_ORDER_COUNT).toLongval payCount = StringUtils.getFieldFromConcatString(countInfo, "\\|", Constants.FIELD_PAY_COUNT).toLongval sortKey = SortKey(clickCount, orderCount, payCount)(sortKey, countInfo)}val top10CategoryArray = sortKey2FullCountRDD.sortByKey(false).take(10)val top10CategoryRDD = sparkSession.sparkContext.makeRDD(top10CategoryArray).map{case (sortKey, countInfo) =>val cid = StringUtils.getFieldFromConcatString(countInfo, "\\|", Constants.FIELD_CATEGORY_ID).toLongval clickCount = sortKey.clickCountval orderCount = sortKey.orderCountval payCount = sortKey.payCountTop10Category(taskUUID, cid, clickCount, orderCount, payCount)}import sparkSession.implicits._top10CategoryRDD.toDF().write.format("jdbc").option("url", ConfigurationManager.config.getString(Constants.JDBC_URL)).option("user", ConfigurationManager.config.getString(Constants.JDBC_USER)).option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD)).option("dbtable", "top10_category_0308").mode(SaveMode.Append).savetop10CategoryArray

二次排序类:

case class SortKey(clickCount:Long, orderCount:Long, payCount:Long) extends Ordered[SortKey]{// this.compare(that)// this compare that// compare > 0   this > that// compare <0    this < thatoverride def compare(that: SortKey): Int = {if(this.clickCount - that.clickCount != 0){return (this.clickCount - that.clickCount).toInt}else if(this.orderCount - that.orderCount != 0){return (this.orderCount - that.orderCount).toInt}else{return (this.payCount - that.payCount).toInt}}
}

2.3 全部代码

import java.util.{Date, Random, UUID}import commons.conf.ConfigurationManager
import commons.constant.Constants
import commons.model.{UserInfo, UserVisitAction}
import commons.utils._
import net.sf.json.JSONObject
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SaveMode, SparkSession}import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}object SessionStatisticAgg {def main(args: Array[String]): Unit = {// 获取查询的限制条件val jsonStr = ConfigurationManager.config.getString(Constants.TASK_PARAMS)val taskParam = JSONObject.fromObject(jsonStr)// 获取全局独一无二的主键val taskUUID = UUID.randomUUID().toString// 创建sparkConfval sparkConf = new SparkConf().setAppName("session").setMaster("local[*]")// 创建sparkSessionval sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()// actionRDD: rdd[UserVisitAction]val actionRDD = getActionRDD(sparkSession, taskParam)// sessionId2ActionRDD: rdd[(sid, UserVisitAction)]val sessionId2ActionRDD = actionRDD.map{item => (item.session_id, item)}// sessionId2GroupRDD: rdd[(sid, iterable(UserVisitAction))]val sessionId2GroupRDD = sessionId2ActionRDD.groupByKey()// sparkSession.sparkContext.setCheckpointDir()sessionId2GroupRDD.cache()// sessionId2GroupRDD.checkpoint()// 获取聚合数据里面的聚合信息val sessionId2FullInfoRDD = getFullInfoData(sparkSession, sessionId2GroupRDD)// 创建自定义累加器对象val sessionStatAccumulator = new SessionStatAccumulator// 注册自定义累加器sparkSession.sparkContext.register(sessionStatAccumulator, "sessionAccumulator")// 过滤用户数据val sessionId2FilterRDD = getFilteredData(taskParam, sessionStatAccumulator, sessionId2FullInfoRDD)sessionId2FilterRDD.count()// 获取最终的统计结果getFinalData(sparkSession, taskUUID, sessionStatAccumulator.value)// 需求二:session随机抽取// sessionId2FilterRDD: RDD[(sid, fullInfo)] 一个session对应一条数据,也就是一个fullInfosessionRandomExtract(sparkSession, taskUUID, sessionId2FilterRDD)// sessionId2ActionRDD: RDD[(sessionId, action)]// sessionId2FilterRDD : RDD[(sessionId, FullInfo)]  符合过滤条件的// sessionId2FilterActionRDD: join// 获取所有符合过滤条件的action数据val sessionId2FilterActionRDD = sessionId2ActionRDD.join(sessionId2FilterRDD).map{case (sessionId, (action, fullInfo)) =>(sessionId, action)}top10PopularCategories(sparkSession, taskUUID, sessionId2FilterActionRDD)}def getClickCount(sessionId2FilterActionRDD: RDD[(String, UserVisitAction)]) = {//    val clickFilterRDD = sessionId2FilterActionRDD.filter{//      case (sessionId, action) => action.click_category_id != -1L
//    }// 先进行过滤,把点击行为对应的action保留下来val clickFilterRDD = sessionId2FilterActionRDD.filter(item => item._2.click_category_id != -1L)// 进行格式转换,为reduceByKey做准备val clickNumRDD = clickFilterRDD.map{case (sessionId, action) => (action.click_category_id, 1L)}clickNumRDD.reduceByKey(_+_)}def getOrderCount(sessionId2FilterActionRDD: RDD[(String, UserVisitAction)]) = {val orderFilterRDD = sessionId2FilterActionRDD.filter(item => item._2.order_category_ids != null)val orderNumRDD = orderFilterRDD.flatMap{// action.order_category_ids.split(","): Array[String]// action.order_category_ids.split(",").map(item => (item.toLong, 1L)// 先将我们的字符串拆分成字符串数组,然后使用map转化数组中的每个元素,// 原来我们的每一个元素都是一个string,现在转化为(long, 1L)case (sessionId, action) => action.order_category_ids.split(",").map(item => (item.toLong, 1L))}orderNumRDD.reduceByKey(_+_)}def getPayCount(sessionId2FilterActionRDD: RDD[(String, UserVisitAction)]) = {val payFilterRDD = sessionId2FilterActionRDD.filter(item => item._2.pay_category_ids != null)val payNumRDD = payFilterRDD.flatMap{case (sid, action) =>action.pay_category_ids.split(",").map(item => (item.toLong, 1L))}payNumRDD.reduceByKey(_+_)}def getFullCount(cid2CidRDD: RDD[(Long, Long)],cid2ClickCountRDD: RDD[(Long, Long)],cid2OrderCountRDD: RDD[(Long, Long)],cid2PayCountRDD: RDD[(Long, Long)]) = {val cid2ClickInfoRDD = cid2CidRDD.leftOuterJoin(cid2ClickCountRDD).map{case (cid, (categoryId, option)) =>val clickCount = if(option.isDefined) option.get else 0val aggrCount = Constants.FIELD_CATEGORY_ID + "=" + cid + "|" +Constants.FIELD_CLICK_COUNT + "=" + clickCount(cid, aggrCount)}val cid2OrderInfoRDD = cid2ClickInfoRDD.leftOuterJoin(cid2OrderCountRDD).map{case (cid, (clickInfo, option)) =>val orderCount = if(option.isDefined) option.get else 0val aggrInfo = clickInfo + "|" +Constants.FIELD_ORDER_COUNT + "=" + orderCount(cid, aggrInfo)}val cid2PayInfoRDD = cid2OrderInfoRDD.leftOuterJoin(cid2PayCountRDD).map{case (cid, (orderInfo, option)) =>val payCount = if(option.isDefined) option.get else 0val aggrInfo = orderInfo + "|" +Constants.FIELD_PAY_COUNT + "=" + payCount(cid, aggrInfo)}cid2PayInfoRDD}def top10PopularCategories(sparkSession: SparkSession,taskUUID: String,sessionId2FilterActionRDD: RDD[(String, UserVisitAction)]) = {// 第一步:获取所有发生过点击、下单、付款的品类var cid2CidRDD = sessionId2FilterActionRDD.flatMap{case (sid, action)=>val categoryBuffer = new ArrayBuffer[(Long, Long)]()// 点击行为if(action.click_category_id != -1){categoryBuffer += ((action.click_category_id, action.click_category_id))}else if(action.order_category_ids != null){for(orderCid <- action.order_category_ids.split(","))categoryBuffer += ((orderCid.toLong, orderCid.toLong))}else if(action.pay_category_ids != null){for(payCid <- action.pay_category_ids.split(","))categoryBuffer += ((payCid.toLong, payCid.toLong))}categoryBuffer}cid2CidRDD = cid2CidRDD.distinct()// 第二步:统计品类的点击次数、下单次数、付款次数val cid2ClickCountRDD = getClickCount(sessionId2FilterActionRDD)val cid2OrderCountRDD = getOrderCount(sessionId2FilterActionRDD)val cid2PayCountRDD = getPayCount(sessionId2FilterActionRDD)// cid2FullCountRDD: RDD[(cid, countInfo)]// (62,categoryid=62|clickCount=77|orderCount=65|payCount=67)val cid2FullCountRDD = getFullCount(cid2CidRDD, cid2ClickCountRDD, cid2OrderCountRDD, cid2PayCountRDD)// 实现自定义二次排序keyval sortKey2FullCountRDD = cid2FullCountRDD.map{case (cid, countInfo) =>val clickCount = StringUtils.getFieldFromConcatString(countInfo, "\\|", Constants.FIELD_CLICK_COUNT).toLongval orderCount = StringUtils.getFieldFromConcatString(countInfo, "\\|", Constants.FIELD_ORDER_COUNT).toLongval payCount = StringUtils.getFieldFromConcatString(countInfo, "\\|", Constants.FIELD_PAY_COUNT).toLongval sortKey = SortKey(clickCount, orderCount, payCount)(sortKey, countInfo)}val top10CategoryArray = sortKey2FullCountRDD.sortByKey(false).take(10)val top10CategoryRDD = sparkSession.sparkContext.makeRDD(top10CategoryArray).map{case (sortKey, countInfo) =>val cid = StringUtils.getFieldFromConcatString(countInfo, "\\|", Constants.FIELD_CATEGORY_ID).toLongval clickCount = sortKey.clickCountval orderCount = sortKey.orderCountval payCount = sortKey.payCountTop10Category(taskUUID, cid, clickCount, orderCount, payCount)}import sparkSession.implicits._top10CategoryRDD.toDF().write.format("jdbc").option("url", ConfigurationManager.config.getString(Constants.JDBC_URL)).option("user", ConfigurationManager.config.getString(Constants.JDBC_USER)).option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD)).option("dbtable", "top10_category_0308").mode(SaveMode.Append).savetop10CategoryArray}def generateRandomIndexList(extractPerDay:Long,daySessionCount:Long,hourCountMap:mutable.HashMap[String, Long],hourListMap:mutable.HashMap[String, ListBuffer[Int]]): Unit ={for((hour, count) <- hourCountMap){// 获取一个小时要抽取多少条数据var hourExrCount = ((count / daySessionCount.toDouble) * extractPerDay).toInt// 避免一个小时要抽取的数量超过这个小时的总数if(hourExrCount > count){hourExrCount = count.toInt}val random = new Random()hourListMap.get(hour) match{case None => hourListMap(hour) = new ListBuffer[Int]for(i <- 0 until hourExrCount){var index = random.nextInt(count.toInt)while(hourListMap(hour).contains(index)){index = random.nextInt(count.toInt)}hourListMap(hour).append(index)}case Some(list) =>for(i <- 0 until hourExrCount){var index = random.nextInt(count.toInt)while(hourListMap(hour).contains(index)){index = random.nextInt(count.toInt)}hourListMap(hour).append(index)}}}}def sessionRandomExtract(sparkSession: SparkSession,taskUUID: String,sessionId2FilterRDD: RDD[(String, String)]): Unit = {// dateHour2FullInfoRDD: RDD[(dateHour, fullInfo)]val dateHour2FullInfoRDD = sessionId2FilterRDD.map{case (sid, fullInfo) =>val startTime = StringUtils.getFieldFromConcatString(fullInfo, "\\|", Constants.FIELD_START_TIME)// dateHour: yyyy-MM-dd_HHval dateHour = DateUtils.getDateHour(startTime)(dateHour, fullInfo)}// hourCountMap: Map[(dateHour, count)]val hourCountMap = dateHour2FullInfoRDD.countByKey()// dateHourCountMap: Map[(date, Map[(hour, count)])]val dateHourCountMap = new mutable.HashMap[String, mutable.HashMap[String, Long]]()for((dateHour, count) <- hourCountMap){val date = dateHour.split("_")(0)val hour = dateHour.split("_")(1)dateHourCountMap.get(date) match{case None => dateHourCountMap(date) = new mutable.HashMap[String, Long]()dateHourCountMap(date) += (hour->count)case Some(map) => dateHourCountMap(date) += (hour->count)}}// 解决问题一: 一共有多少天: dateHourCountMap.size//              一天抽取多少条:100 / dateHourCountMap.sizeval extractPerDay = 100 / dateHourCountMap.size// 解决问题二: 一天有多少session:dateHourCountMap(date).values.sum// 解决问题三: 一个小时有多少session:dateHourCountMap(date)(hour)val dateHourExtractIndexListMap = new mutable.HashMap[String, mutable.HashMap[String, ListBuffer[Int]]]()// dateHourCountMap: Map[(date, Map[(hour, count)])]for((date, hourCountMap) <- dateHourCountMap){val dateSessionCount = hourCountMap.values.sumdateHourExtractIndexListMap.get(date) match{case None => dateHourExtractIndexListMap(date) = new mutable.HashMap[String, ListBuffer[Int]]()generateRandomIndexList(extractPerDay, dateSessionCount, hourCountMap,  dateHourExtractIndexListMap(date))case Some(map) =>generateRandomIndexList(extractPerDay, dateSessionCount, hourCountMap,  dateHourExtractIndexListMap(date))}// 到目前为止,我们获得了每个小时要抽取的session的index// 广播大变量,提升任务性能val dateHourExtractIndexListMapBd = sparkSession.sparkContext.broadcast(dateHourExtractIndexListMap)// dateHour2FullInfoRDD: RDD[(dateHour, fullInfo)]// dateHour2GroupRDD: RDD[(dateHour, iterableFullInfo)]val dateHour2GroupRDD = dateHour2FullInfoRDD.groupByKey()// extractSessionRDD: RDD[SessionRandomExtract]val extractSessionRDD = dateHour2GroupRDD.flatMap{case (dateHour, iterableFullInfo) =>val date = dateHour.split("_")(0)val hour = dateHour.split("_")(1)val extractList = dateHourExtractIndexListMapBd.value.get(date).get(hour)val extractSessionArrayBuffer = new ArrayBuffer[SessionRandomExtract]()var index = 0for(fullInfo <- iterableFullInfo){if(extractList.contains(index)){val sessionId = StringUtils.getFieldFromConcatString(fullInfo, "\\|", Constants.FIELD_SESSION_ID)val startTime = StringUtils.getFieldFromConcatString(fullInfo, "\\|",Constants.FIELD_START_TIME)val searchKeywords = StringUtils.getFieldFromConcatString(fullInfo, "\\|", Constants.FIELD_SEARCH_KEYWORDS)val clickCategories = StringUtils.getFieldFromConcatString(fullInfo, "\\|", Constants.FIELD_CLICK_CATEGORY_IDS)val extractSession = SessionRandomExtract(taskUUID, sessionId, startTime, searchKeywords, clickCategories)extractSessionArrayBuffer += extractSession}index += 1}extractSessionArrayBuffer}import sparkSession.implicits._extractSessionRDD.toDF().write.format("jdbc").option("url", ConfigurationManager.config.getString(Constants.JDBC_URL)).option("user",ConfigurationManager.config.getString(Constants.JDBC_USER)).option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD)).option("dbtable", "session_extract_0308").mode(SaveMode.Append).save()}}def getFinalData(sparkSession: SparkSession,taskUUID: String,value: mutable.HashMap[String, Int]): Unit = {// 获取所有符合过滤条件的session个数val session_count = value.getOrElse(Constants.SESSION_COUNT, 1).toDouble// 不同范围访问时长的session个数val visit_length_1s_3s = value.getOrElse(Constants.TIME_PERIOD_1s_3s, 0)val visit_length_4s_6s = value.getOrElse(Constants.TIME_PERIOD_4s_6s, 0)val visit_length_7s_9s = value.getOrElse(Constants.TIME_PERIOD_7s_9s, 0)val visit_length_10s_30s = value.getOrElse(Constants.TIME_PERIOD_10s_30s, 0)val visit_length_30s_60s = value.getOrElse(Constants.TIME_PERIOD_30s_60s, 0)val visit_length_1m_3m = value.getOrElse(Constants.TIME_PERIOD_1m_3m, 0)val visit_length_3m_10m = value.getOrElse(Constants.TIME_PERIOD_3m_10m, 0)val visit_length_10m_30m = value.getOrElse(Constants.TIME_PERIOD_10m_30m, 0)val visit_length_30m = value.getOrElse(Constants.TIME_PERIOD_30m, 0)// 不同访问步长的session个数val step_length_1_3 = value.getOrElse(Constants.STEP_PERIOD_1_3, 0)val step_length_4_6 = value.getOrElse(Constants.STEP_PERIOD_4_6, 0)val step_length_7_9 = value.getOrElse(Constants.STEP_PERIOD_7_9, 0)val step_length_10_30 = value.getOrElse(Constants.STEP_PERIOD_10_30, 0)val step_length_30_60 = value.getOrElse(Constants.STEP_PERIOD_30_60, 0)val step_length_60 = value.getOrElse(Constants.STEP_PERIOD_60, 0)val visit_length_1s_3s_ratio = NumberUtils.formatDouble(visit_length_1s_3s / session_count, 2)val visit_length_4s_6s_ratio = NumberUtils.formatDouble(visit_length_4s_6s / session_count, 2)val visit_length_7s_9s_ratio = NumberUtils.formatDouble(visit_length_7s_9s / session_count, 2)val visit_length_10s_30s_ratio = NumberUtils.formatDouble(visit_length_10s_30s / session_count, 2)val visit_length_30s_60s_ratio = NumberUtils.formatDouble(visit_length_30s_60s / session_count, 2)val visit_length_1m_3m_ratio = NumberUtils.formatDouble(visit_length_1m_3m / session_count, 2)val visit_length_3m_10m_ratio = NumberUtils.formatDouble(visit_length_3m_10m / session_count, 2)val visit_length_10m_30m_ratio = NumberUtils.formatDouble(visit_length_10m_30m / session_count, 2)val visit_length_30m_ratio = NumberUtils.formatDouble(visit_length_30m / session_count, 2)val step_length_1_3_ratio = NumberUtils.formatDouble(step_length_1_3 / session_count, 2)val step_length_4_6_ratio = NumberUtils.formatDouble(step_length_4_6 / session_count, 2)val step_length_7_9_ratio = NumberUtils.formatDouble(step_length_7_9 / session_count, 2)val step_length_10_30_ratio = NumberUtils.formatDouble(step_length_10_30 / session_count, 2)val step_length_30_60_ratio = NumberUtils.formatDouble(step_length_30_60 / session_count, 2)val step_length_60_ratio = NumberUtils.formatDouble(step_length_60 / session_count, 2)val stat = SessionAggrStat(taskUUID, session_count.toInt, visit_length_1s_3s_ratio, visit_length_4s_6s_ratio, visit_length_7s_9s_ratio,visit_length_10s_30s_ratio, visit_length_30s_60s_ratio, visit_length_1m_3m_ratio,visit_length_3m_10m_ratio, visit_length_10m_30m_ratio, visit_length_30m_ratio,step_length_1_3_ratio, step_length_4_6_ratio, step_length_7_9_ratio,step_length_10_30_ratio, step_length_30_60_ratio, step_length_60_ratio)val statRDD = sparkSession.sparkContext.makeRDD(Array(stat))import sparkSession.implicits._statRDD.toDF().write.format("jdbc").option("url", ConfigurationManager.config.getString(Constants.JDBC_URL)).option("user", ConfigurationManager.config.getString(Constants.JDBC_USER)).option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD)).option("dbtable", "session_ration_0308").mode(SaveMode.Append).save()}def calculateVisitLength(visitLength:Long, sessionStatisticAccumulator: SessionStatAccumulator): Unit ={if(visitLength >=1 && visitLength<=3) {sessionStatisticAccumulator.add(Constants.TIME_PERIOD_1s_3s)} else if (visitLength >= 4 && visitLength <= 6) {sessionStatisticAccumulator.add(Constants.TIME_PERIOD_4s_6s)} else if (visitLength >= 7 && visitLength <= 9) {sessionStatisticAccumulator.add(Constants.TIME_PERIOD_7s_9s)} else if (visitLength >= 10 && visitLength <= 30) {sessionStatisticAccumulator.add(Constants.TIME_PERIOD_10s_30s)} else if (visitLength > 30 && visitLength <= 60) {sessionStatisticAccumulator.add(Constants.TIME_PERIOD_30s_60s)} else if (visitLength > 60 && visitLength <= 180) {sessionStatisticAccumulator.add(Constants.TIME_PERIOD_1m_3m)} else if (visitLength > 180 && visitLength <= 600) {sessionStatisticAccumulator.add(Constants.TIME_PERIOD_3m_10m)} else if (visitLength > 600 && visitLength <= 1800) {sessionStatisticAccumulator.add(Constants.TIME_PERIOD_10m_30m)} else if (visitLength > 1800) {sessionStatisticAccumulator.add(Constants.TIME_PERIOD_30m)}}def calculateStepLength(stepLength:Long, sessionStatisticAccumulator: SessionStatAccumulator): Unit ={if(stepLength >=1 && stepLength <=3){sessionStatisticAccumulator.add(Constants.STEP_PERIOD_1_3)}else if (stepLength >= 4 && stepLength <= 6) {sessionStatisticAccumulator.add(Constants.STEP_PERIOD_4_6)} else if (stepLength >= 7 && stepLength <= 9) {sessionStatisticAccumulator.add(Constants.STEP_PERIOD_7_9)} else if (stepLength >= 10 && stepLength <= 30) {sessionStatisticAccumulator.add(Constants.STEP_PERIOD_10_30)} else if (stepLength > 30 && stepLength <= 60) {sessionStatisticAccumulator.add(Constants.STEP_PERIOD_30_60)} else if (stepLength > 60) {sessionStatisticAccumulator.add(Constants.STEP_PERIOD_60)}}def getFilteredData(taskParam: JSONObject,sessionStatAccumulator: SessionStatAccumulator,sessionId2FullInfoRDD: RDD[(String, String)])= {val startAge = ParamUtils.getParam(taskParam, Constants.PARAM_START_AGE)val endAge = ParamUtils.getParam(taskParam, Constants.PARAM_END_AGE)val professionals = ParamUtils.getParam(taskParam, Constants.PARAM_PROFESSIONALS)val cities = ParamUtils.getParam(taskParam, Constants.PARAM_CITIES)val sex = ParamUtils.getParam(taskParam, Constants.PARAM_SEX)val keywords = ParamUtils.getParam(taskParam, Constants.PARAM_KEYWORDS)val categoryIds = ParamUtils.getParam(taskParam, Constants.PARAM_CATEGORY_IDS)var filterInfo = (if(startAge != null) Constants.PARAM_START_AGE + "=" + startAge + "|" else "") +(if (endAge != null) Constants.PARAM_END_AGE + "=" + endAge + "|" else "") +(if (professionals != null) Constants.PARAM_PROFESSIONALS + "=" + professionals + "|" else "") +(if (cities != null) Constants.PARAM_CITIES + "=" + cities + "|" else "") +(if (sex != null) Constants.PARAM_SEX + "=" + sex + "|" else "") +(if (keywords != null) Constants.PARAM_KEYWORDS + "=" + keywords + "|" else "") +(if (categoryIds != null) Constants.PARAM_CATEGORY_IDS + "=" + categoryIds else "")if(filterInfo.endsWith("\\|"))filterInfo = filterInfo.substring(0, filterInfo.length - 1)val sessionId2FilterRDD = sessionId2FullInfoRDD.filter{case (sessionId, fullInfo) =>var success = trueif(!ValidUtils.between(fullInfo, Constants.FIELD_AGE, filterInfo, Constants.PARAM_START_AGE, Constants.PARAM_END_AGE))success = falseif(!ValidUtils.in(fullInfo, Constants.FIELD_PROFESSIONAL, filterInfo, Constants.PARAM_PROFESSIONALS))success = falseif (!ValidUtils.in(fullInfo, Constants.FIELD_CITY, filterInfo, Constants.PARAM_CITIES))success = falseif (!ValidUtils.equal(fullInfo, Constants.FIELD_SEX, filterInfo, Constants.PARAM_SEX))success = falseif (!ValidUtils.in(fullInfo, Constants.FIELD_SEARCH_KEYWORDS, filterInfo, Constants.PARAM_KEYWORDS))success = falseif (!ValidUtils.in(fullInfo, Constants.FIELD_CATEGORY_ID, filterInfo, Constants.PARAM_CATEGORY_IDS))success = falseif(success){// 只要进入此处,就代表此session数据符合过滤条件,进行总数的计数sessionStatAccumulator.add(Constants.SESSION_COUNT)val visitLength = StringUtils.getFieldFromConcatString(fullInfo, "\\|", Constants.FIELD_VISIT_LENGTH).toLongval stepLength = StringUtils.getFieldFromConcatString(fullInfo, "\\|", Constants.FIELD_STEP_LENGTH).toLongcalculateVisitLength(visitLength, sessionStatAccumulator)calculateStepLength(stepLength, sessionStatAccumulator)}success}sessionId2FilterRDD}def getFullInfoData(sparkSession: SparkSession,sessionId2GroupRDD: RDD[(String, Iterable[UserVisitAction])]) = {val userId2AggrInfoRDD = sessionId2GroupRDD.map{case (sid, iterableAction) =>var startTime:Date = nullvar endTime:Date = nullvar userId = -1Lval searchKeywords = new StringBuffer("")val clickCategories = new StringBuffer("")var stepLength = 0for(action <- iterableAction){if(userId == -1L){userId = action.user_id}val actionTime = DateUtils.parseTime(action.action_time)if(startTime == null || startTime.after(actionTime))startTime = actionTimeif(endTime == null || endTime.before(actionTime))endTime = actionTimeval searchKeyword = action.search_keywordval clickCategory = action.click_category_idif(StringUtils.isNotEmpty(searchKeyword) &&!searchKeywords.toString.contains(searchKeyword))searchKeywords.append(searchKeyword + ",")if(clickCategory != -1L &&!clickCategories.toString.contains(clickCategory))clickCategories.append(clickCategory + ",")stepLength += 1}val searchKw = StringUtils.trimComma(searchKeywords.toString)val clickCg = StringUtils.trimComma(clickCategories.toString)val visitLength = (endTime.getTime - startTime.getTime) / 1000val aggrInfo = Constants.FIELD_SESSION_ID + "=" + sid + "|" +Constants.FIELD_SEARCH_KEYWORDS + "=" + searchKw + "|" +Constants.FIELD_CLICK_CATEGORY_IDS + "=" + clickCg + "|" +Constants.FIELD_VISIT_LENGTH + "=" + visitLength + "|" +Constants.FIELD_STEP_LENGTH + "=" + stepLength + "|" +Constants.FIELD_START_TIME + "=" + DateUtils.formatTime(startTime)(userId, aggrInfo)}val sql = "select * from user_info"import sparkSession.implicits._// sparkSession.sql(sql): DateFrame DateSet[Row]// sparkSession.sql(sql).as[UserInfo]: DateSet[UserInfo]//  sparkSession.sql(sql).as[UserInfo].rdd: RDD[UserInfo]// sparkSession.sql(sql).as[UserInfo].rdd.map(item => (item.user_id, item)): RDD[(userId, UserInfo)]val userInfoRDD = sparkSession.sql(sql).as[UserInfo].rdd.map(item => (item.user_id, item))userId2AggrInfoRDD.join(userInfoRDD).map{case (userId, (aggrInfo, userInfo)) =>val age = userInfo.ageval professional = userInfo.professionalval sex = userInfo.sexval city = userInfo.cityval fullInfo = aggrInfo + "|" + Constants.FIELD_AGE + "=" + age + "|" +Constants.FIELD_PROFESSIONAL + "=" + professional + "|" +Constants.FIELD_SEX + "=" + sex + "|" +Constants.FIELD_CITY + "=" + cityval sessionId = StringUtils.getFieldFromConcatString(aggrInfo, "\\|", Constants.FIELD_SESSION_ID)(sessionId, fullInfo)}}def getActionRDD(sparkSession: SparkSession, taskParam: JSONObject) = {val startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE)val endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE)val sql = "select * from user_visit_action where date>='" + startDate + "' and date<='" +endDate + "'"import sparkSession.implicits._// sparkSession.sql(sql) : DataFrame   DateSet[Row]// sparkSession.sql(sql).as[UserVisitAction]: DateSet[UserVisitAction]// sparkSession.sql(sql).as[UserVisitAction].rdd: rdd[UserVisitAction]sparkSession.sql(sql).as[UserVisitAction].rdd}}

三、Top10热门品类中top10活跃session统计


def top10ActiveSession(sparkSession: SparkSession,taskUUID: String,sessionId2FilterActionRDD: RDD[(String, UserVisitAction)],top10CategoryArray: Array[(SortKey, String)]): Unit = {// 第一步:过滤出所有点击过Top10品类的action// 1: join
//    val cid2CountInfoRDD = sparkSession.sparkContext.makeRDD(top10CategoryArray).map{//      case (sortKey, countInfo) =>
//        val cid = StringUtils.getFieldFromConcatString(countInfo, "\\|", Constants.FIELD_CATEGORY_ID).toLong
//        (cid, countInfo)
//    }
//
//      val cid2ActionRDD = sessionId2FilterActionRDD.map{//        case (sessionId, action) =>
//          val cid = action.click_category_id
//          (cid, action)
//      }
//
//    val sessionId2ActionRDD = cid2CountInfoRDD.join(cid2ActionRDD).map{//      case (cid, (countInfo, action)) =>
//        val sid = action.session_id
//        (sid, action)
//    }// 2:使用filter// cidArray: Array[Long] 包含了Top10热门品类IDval cidArray = top10CategoryArray.map{case (sortKey, countInfo) =>val cid = StringUtils.getFieldFromConcatString(countInfo, "\\|", Constants.FIELD_CATEGORY_ID).toLongcid}// 所有符合过滤条件的,并且点击过Top10热门品类的actionval sessionId2ActionRDD = sessionId2FilterActionRDD.filter{case (sessionId, action) =>cidArray.contains(action.click_category_id)}// 按照sessionId进行聚合操作val sessionId2GroupRDD = sessionId2ActionRDD.groupByKey()// cid2SessionCountRDD: RDD[(cid, sessionCount)]val cid2SessionCountRDD = sessionId2GroupRDD.flatMap{case (sessionId, iterableAction) =>val categoryCountMap = new mutable.HashMap[Long, Long]()for(action <- iterableAction){val cid = action.click_category_idif(!categoryCountMap.contains(cid))categoryCountMap += (cid->0)categoryCountMap.update(cid, categoryCountMap(cid) + 1)}// 记录了一个session对于它所有点击过的品类的点击次数for((cid, count) <- categoryCountMap)yield (cid, sessionId + "=" + count)}}

接着上面的方法写:

// cid2GroupRDD: RDD[(cid, iterableSessionCount)]// cid2GroupRDD每一条数据都是一个categoryid和它对应的所有点击过它的session对它的点击次数val cid2GroupRDD = cid2SessionCountRDD.groupByKey()// top10SessionRDD: RDD[Top10Session]val top10SessionRDD = cid2GroupRDD.flatMap{case (cid, iterableSessionCount) =>// true: item1放在前面// flase: item2放在前面// item: sessionCount   String   "sessionId=count"val sortList = iterableSessionCount.toList.sortWith((item1, item2) => {item1.split("=")(1).toLong > item2.split("=")(1).toLong}).take(10)val top10Session = sortList.map{// item : sessionCount   String   "sessionId=count"case item =>val sessionId = item.split("=")(0)val count = item.split("=")(1).toLongTop10Session(taskUUID, cid, sessionId, count)}top10Session}import sparkSession.implicits._top10SessionRDD.toDF().write.format("jdbc").option("url", ConfigurationManager.config.getString(Constants.JDBC_URL)).option("user", ConfigurationManager.config.getString(Constants.JDBC_USER)).option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD)).option("dbtable", "top10_session_0308").mode(SaveMode.Append).save()

尚硅谷电商管理平台笔记2相关推荐

  1. 8.尚硅谷电商推荐系统预览

    课程地址:尚硅谷大数据项目教程(大数据实战电商推荐系统) 尚硅谷电商推荐系统的配套资料及虚拟机 链接:https://pan.baidu.com/s/1iSMqV2wPkEfIsO1FrkxRNQ?p ...

  2. 前端03.vue电商管理平台项目实战

    VUE电商管理平台 前部分内容见网站 ​ https://share.mubu.com/doc/7vPChNBEZl 附typora自动标号 https://zhuanlan.zhihu.com/p/ ...

  3. 黑马VUE电商管理后台笔记记录

    电商后台项目遇到的问题 1..eslintrc.js 中 '@vue/standard' 需要删掉或者注释掉 2.不能直接在 vue ui 里面安装 less-loader和less依赖,因为版本问题 ...

  4. nodejs项目实例电商管理平台|销售系统

  5. Spark综合项目:企业电商分析平台

    文章目录 1. 项目背景 2. 项目架构 3. 需求分析 离线模块开发 1.用户访问Session分析 2. 页面单跳转化率统计 3. 区域热门商品统计 实时模块开发 1. 项目背景 该项目来源于尚硅 ...

  6. 电商Sass平台-商城运营后台原型-仓储管理-订单管理-店铺运营-采购管理-数据分析-交易分析-留存分析-客户管理-用户运营-围栏管理-商品管理-流量分析-电商erp后台管理-用户权限-销量分析

    axure作品内容介绍:电商Sass平台-商城运营后台原型-仓储管理-订单管理-平台运营-采购管理-数据分析-交易分析-留存分析-客户管理-用户运营-围栏管理-商品管理-店铺装修-门店管理-商品档案- ...

  7. 跨境电商多平台运营库存管理难 选对erp软件很重要

    近几年跨境电商迅速发展,大多数跨境电商卖家都是多平台多渠道同时运营,这将不可避免地导致更多的管理困难,其中库存管理问题成为了跨境电商卖家的核心关注点之一,也是大部分跨境电商卖家头疼的问题. 库存管理是 ...

  8. 【学习笔记】尚硅谷Hadoop大数据教程笔记

    本文是尚硅谷Hadoop教程的学习笔记,由于个人的需要,只致力于搞清楚Hadoop是什么,它可以解决什么问题,以及它的原理是什么.至于具体怎么安装.使用和编写代码不在我考虑的范围内. 一.Hadoop ...

  9. Java项目:电商书城平台系统设计和实现(java+springboot+mysql+spring+jsp)

    源码获取:博客首页 "资源" 里下载! JAVA springboot 电商书城平台系统(已调试) 主要实现了书城网站的浏览.加入购物车操作.订单操作.支付操作.分类查看.搜索.以 ...

最新文章

  1. 为什么硬盘速度忽快忽慢_C盘装软件会拖慢电脑速度?C盘是不是比其他盘快?...
  2. SSI注入(server side includes injection 服务器端包含注入)
  3. python可以干嘛知乎-Python到底可以干什么?老男孩Python视频教程
  4. linux源代码解读,【原创】Linux MM 源代码解读 (1)
  5. 【原】npm 常用命令详解
  6. mvp内粗泄露问题_如何在一天内从MVP转到生产服务器
  7. 跨系统服务器data,跨服务器的数据整合方法及系统 Cross-server data integration method and system...
  8. java quartz TriggerBuilder
  9. linux的8小时差问题解决
  10. MSSQL数据库初级到高级的学习资料整理
  11. 菜鸟升级记——模板方法模式
  12. 动态网页抓取——学习笔记
  13. 为什么mydock会经常崩溃_MyDock
  14. 优秀的程序员是没有性生活的
  15. 水水的证件-身份证、证件加水印小工具
  16. Beta 反(tu)思(cao) 获小黄衫感言
  17. 大数据小项目之电视收视率企业项目09--hive环境搭建
  18. 贴片电阻的封装、功率
  19. 手机充电器的D+,D-电压
  20. html+css轮播图

热门文章

  1. python+PIL批量制作淘宝主图(头图)及满屏水印添加
  2. SQL——相关子查询和不相关子查询
  3. php 数字 字母,怎么使用php实现数字转字母
  4. 中文Linux先驱陨落:10年前市值达4亿美元
  5. Race condition between wait_event and wake_up
  6. Databinding+LiveData轻松实现无重启换肤
  7. webpack简单打包PC网站前端资源
  8. 短视频seo搜索优化主要内容
  9. jsp导入MySQL包_jsp操作数据库需要导入的包
  10. 中国计算机核心期刊排名