本篇文章记录用户访问session分析-top10热门品类之本地测试。

在测试的过程中,到很多问题。

问题一:二次排序需要序列化,否则会在程序运行的时候报错。

public class CategorySortKey implements Ordered<CategorySortKey> ,Serializable

问题二: 必须进行去重 , 如果不去重,会出现重复的categoryid,排序会对重复的Categoryid的categoryInfo进行排序 ,最后很可能拿到重复的数

categoryidRDD = categoryidRDD.distinct();

完整代码

UserVisitSessionAnlyizSpark.java

package graduation.java.spark;/*** FileName: UserVisitSessionAnlyizSpark* Author:   hadoop* Email:    3165845957@qq.com* Date:     19-3-1 上午10:41* Description:* 用户访问session分析Spark作业** 接收用户创建的分析任务,用户可能指定的条件如下:** 1、时间范围:起始日期~结束日期* 2、性别:男或女* 3、年龄范围* 4、职业:多选* 5、城市:多选* 6、搜索词:多个搜索词,只要某个session中的任何一个action搜索过指定的关键词,那么session就符合条件* 7、点击品类:多个品类,只要某个session中的任何一个action点击过某个品类,那么session就符合条件** 我们的spark作业如何接受用户创建的任务?** J2EE平台在接收用户创建任务的请求之后,会将任务信息插入MySQL的task表中,任务参数以JSON格式封装在task_param* 字段中** 接着J2EE平台会执行我们的spark-submit shell脚本,并将taskid作为参数传递给spark-submit shell脚本* spark-submit shell脚本,在执行时,是可以接收参数的,并且会将接收的参数,传递给Spark作业的main函数* 参数就封装在main函数的args数组中** 这是spark本身提供的特性*/import com.alibaba.fastjson.JSONObject;
import graduation.java.conf.ConfigurationManager;
import graduation.java.constant.Constants;
import graduation.java.dao.*;
import graduation.java.domain.*;
import graduation.java.factory.DAOFactory;
import graduation.java.test.MockData;
import graduation.java.util.*;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;
import scala.Tuple2;
import org.apache.spark.sql.SQLContext;
import scala.collection.immutable.Stream;import java.util.*;/*** 用户访问session分析Spark作业***/
public class UserVisitSessionAnalyzeSpark {public static void main(String[] args) {Logger.getLogger("org").setLevel(Level.ERROR);args = new String[]{"1"};// 构建Spark上下文SparkConf conf = new SparkConf().setAppName(Constants.SPARK_APP_NAME_SESSION).setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);SQLContext sqlContext = getSQLContext(sc.sc());// 生成模拟测试数据mockData(sc, sqlContext);//创建需要使用的DAO组件ITaskDAO taskDAO = DAOFactory.getTaskDAO();//首先查询书来指定的任务,并获取任务查询参数long taskid = ParamUtils.getTaskIdFromArgs(args);Task task = taskDAO.findById(taskid);//测试//System.out.println("taskId: "+taskid);JSONObject taskParam = JSONObject.parseObject(task.getTaskParam());//测试/*if (taskParam !=null){System.out.println("taskParam is "+ taskParam.values());}*/// 如果要进行session粒度的数据聚合// 首先要从user_visit_action表中,查询出来指定日期范围内的行为数据JavaRDD<Row> actionRDD = getActionRDDByDateRange(sqlContext,taskParam);JavaPairRDD<String,Row> session2actionRDD = getSessionid2ActionRDD(actionRDD);/*System.out.println("*****************************************");actionRDD.rdd().count();System.out.println("*****************************************");for (Row row : actionRDD.take(10)){System.out.println(row.toString());}*/// 首先,可以将行为数据,按照session_id进行groupByKey分组// 此时的数据的粒度就是session粒度了,然后呢,可以将session粒度的数据// 与用户信息数据,进行join// 然后就可以获取到session粒度的数据,同时呢,数据里面还包含了session对应的user的信息//测试//System.out.println("sessionid2AggrInfoRDD");JavaPairRDD<String,String> sessionid2AggrInfoRDD = aggregateBySession(sqlContext,actionRDD);//测试//sessionid2AggrInfoRDD.count();/* System.out.println("**************sessionid2AggrInfoRDD**************");for (Tuple2<String,String> tuple2 : sessionid2AggrInfoRDD.take(10)){System.out.println(tuple2.toString());}System.out.println("**************sessionid2AggrInfoRDD**************");*/// 接着,就要针对session粒度的聚合数据,按照使用者指定的筛选参数进行数据过滤// 相当于我们自己编写的算子,是要访问外面的任务参数对象的// 所以,大家记得我们之前说的,匿名内部类(算子函数),访问外部对象,是要给外部对象使用final修饰的// 重构,同时进行过滤和统计Accumulator<String> sessionAggrStatAccumulator = sc.accumulator("", new SessionAggrStatAccumulator());JavaPairRDD<String,String> filteredSessionid2AggrInfoRDD = filterSessionAndAggrStat(sessionid2AggrInfoRDD,taskParam,sessionAggrStatAccumulator);//测试/* System.out.println("\n\n\n\n\filteredSessionid2InfoRDD");filteredSessionid2AggrInfoRDD.count();for(Tuple2<String,String> tuple : filteredSessionid2AggrInfoRDD.take(10)){System.out.println(tuple);}*//*** 对于Accumulator这种分布式累加计算的变量的使用,有一个重要说明** 从Accumulator中,获取数据,插入数据库的时候,一定要,一定要,是在有某一个action操作以后* 再进行。。。** 如果没有action的话,那么整个程序根本不会运行。。。** 是不是在calculateAndPersisitAggrStat方法之后,运行一个action操作,比如count、take* 不对!!!** 必须把能够触发job执行的操作,放在最终写入MySQL方法之前** 计算出来的结果,在J2EE中,是怎么显示的,是用两张柱状图显示*//*System.out.println(filteredSessionid2AggrInfoRDD.count());*//*** 特别说明* 我们知道,要将上一个功能的session聚合统计数据获取到,就必须是在一个action操作触发job之后* 才能从Accumulator中获取数据,否则是获取不到数据的,因为没有job执行,Accumulator的值为空* 所以,我们在这里,将随机抽取的功能的实现代码,放在session聚合统计功能的最终计算和写库之前* 因为随机抽取功能中,有一个countByKey算子,是action操作,会触发job*/randomExtractSession(task.getTaskId(),filteredSessionid2AggrInfoRDD,session2actionRDD);//计算出各个范围的session占比,并写入MysqlcalculateAndPersistAggrStat(sessionAggrStatAccumulator.value(),task.getTaskId());/*** session聚合统计(统计出访问时长和访问步长,各个区间的session数量占总session数量的比例)** 如果不进行重构,直接来实现,思路:* 1、actionRDD,映射成<sessionid,Row>的格式* 2、按sessionid聚合,计算出每个session的访问时长和访问步长,生成一个新的RDD* 3、遍历新生成的RDD,将每个session的访问时长和访问步长,去更新自定义Accumulator中的对应的值* 4、使用自定义Accumulator中的统计值,去计算各个区间的比例* 5、将最后计算出来的结果,写入MySQL对应的表中** 普通实现思路的问题:* 1、为什么还要用actionRDD,去映射?其实我们之前在session聚合的时候,映射已经做过了。多此一举* 2、是不是一定要,为了session的聚合这个功能,单独去遍历一遍session?其实没有必要,已经有session数据*        之前过滤session的时候,其实,就相当于,是在遍历session,那么这里就没有必要再过滤一遍了** 重构实现思路:* 1、不要去生成任何新的RDD(处理上亿的数据)* 2、不要去单独遍历一遍session的数据(处理上千万的数据)* 3、可以在进行session聚合的时候,就直接计算出来每个session的访问时长和访问步长* 4、在进行过滤的时候,本来就要遍历所有的聚合session信息,此时,就可以在某个session通过筛选条件后*         将其访问时长和访问步长,累加到自定义的Accumulator上面去* 5、就是两种截然不同的思考方式,和实现方式,在面对上亿,上千万数据的时候,甚至可以节省时间长达*      半个小时,或者数个小时** 开发Spark大型复杂项目的一些经验准则:* 1、尽量少生成RDD* 2、尽量少对RDD进行算子操作,如果有可能,尽量在一个算子里面,实现多个需要做的功能* 3、尽量少对RDD进行shuffle算子操作,比如groupByKey、reduceByKey、sortByKey(map、mapToPair)*      shuffle操作,会导致大量的磁盘读写,严重降低性能*      有shuffle的算子,和没有shuffle的算子,甚至性能,会达到几十分钟,甚至数个小时的差别*       有shfufle的算子,很容易导致数据倾斜,一旦数据倾斜,简直就是性能杀手(完整的解决方案)* 4、无论做什么功能,性能第一*       在传统的J2EE或者.NET后者PHP,软件/系统/网站开发中,我认为是架构和可维护性,可扩展性的重要*       程度,远远高于了性能,大量的分布式的架构,设计模式,代码的划分,类的划分(高并发网站除外)**        在大数据项目中,比如MapReduce、Hive、Spark、Storm,我认为性能的重要程度,远远大于一些代码*      的规范,和设计模式,代码的划分,类的划分;大数据,大数据,最重要的,就是性能*        主要就是因为大数据以及大数据项目的特点,决定了,大数据的程序和项目的速度,都比较慢*         如果不优先考虑性能的话,会导致一个大数据处理程序运行时间长度数个小时,甚至数十个小时*       此时,对于用户体验,简直就是一场灾难**      所以,推荐大数据项目,在开发和代码的架构中,优先考虑性能;其次考虑功能代码的划分、解耦合**      我们如果采用第一种实现方案,那么其实就是代码划分(解耦合、可维护)优先,设计优先*       如果采用第二种方案,那么其实就是性能优先**       讲了这么多,其实大家不要以为我是在岔开话题,大家不要觉得项目的课程,就是单纯的项目本身以及*         代码coding最重要,其实项目,我觉得,最重要的,除了技术本身和项目经验以外;非常重要的一点,就是*       积累了,处理各种问题的经验**//*** 获取排名前10的品类*/getTop10Category(task.getTaskId(),filteredSessionid2AggrInfoRDD,session2actionRDD);// 关闭Spark上下文sc.close();}/*** 获取SQLContext* 如果是在本地测试环境的话,那么就生成SQLContext对象* 如果是在生产环境运行的话,那么就生成HiveContext对象* @param sc SparkContext* @return SQLContext*/private static SQLContext getSQLContext(SparkContext sc) {boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);if(local) {return new SQLContext(sc);} else {return new HiveContext(sc);}}/*** 生成模拟数据(只有本地模式,才会去生成模拟数据)* @param sc* @param sqlContext*/private static void mockData(JavaSparkContext sc, SQLContext sqlContext) {boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);if(local) {MockData.mock(sc, sqlContext);}}/*** 获取指定日期范围内的用户访问行为数据* @param sqlContext SQLContext* @param taskParam 任务参数* @return 行为数据RDD*/private static JavaRDD<Row> getActionRDDByDateRange(SQLContext sqlContext, JSONObject taskParam) {String startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE);String endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE);String sql ="select * "+ "from user_visit_action "+ "where date>='" + startDate + "' "+ "and date<='" + endDate + "'";Dataset actionDF = sqlContext.sql(sql);System.out.println("actionDF");actionDF.show(10);return actionDF.javaRDD();}/*** 获取sessionid到访问行为数据的映射的RDD* @param actionRDD* @return*/public static JavaPairRDD<String,Row> getSessionid2ActionRDD(JavaRDD<Row> actionRDD){return actionRDD.mapToPair(new PairFunction<Row, String, Row>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Row> call(Row row) throws Exception {return new Tuple2<String,Row>(row.getString(2),row);}});}/*** 对行为数据按session粒度进行聚合* @param actionRDD 行为数据RDD* @return session粒度聚合数据*/private static JavaPairRDD<String, String> aggregateBySession(SQLContext sqlContext, JavaRDD<Row> actionRDD) {// 现在actionRDD中的元素是Row,一个Row就是一行用户访问行为记录,比如一次点击或者搜索// 我们现在需要将这个Row映射成<sessionid,Row>的格式JavaPairRDD<String, Row> sessionid2ActionRDD = actionRDD.mapToPair(/*** PairFunction* 第一个参数,相当于是函数的输入* 第二个参数和第三个参数,相当于是函数的输出(Tuple),分别是Tuple第一个和第二个值*/new PairFunction<Row, String, Row>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Row> call(Row row) throws Exception {return new Tuple2<String, Row>(row.getString(2), row);}});// 对行为数据按session粒度进行分组JavaPairRDD<String, Iterable<Row>> sessionid2ActionsRDD =sessionid2ActionRDD.groupByKey();// 对每一个session分组进行聚合,将session中所有的搜索词和点击品类都聚合起来// 到此为止,获取的数据格式,如下:<userid,partAggrInfo(sessionid,searchKeywords,clickCategoryIds)>JavaPairRDD<Long, String> userid2PartAggrInfoRDD = sessionid2ActionsRDD.mapToPair(new PairFunction<Tuple2<String,Iterable<Row>>, Long, String>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<Long, String> call(Tuple2<String, Iterable<Row>> tuple)throws Exception {String sessionid = tuple._1;Iterator<Row> iterator = tuple._2.iterator();StringBuffer searchKeywordsBuffer = new StringBuffer("");StringBuffer clickCategoryIdsBuffer = new StringBuffer("");Long userid = null;// session的起始和结束时间Date startTime = null;Date endTime = null;// session的访问步长int stepLength = 0;// 遍历session所有的访问行为while(iterator.hasNext()) {// 提取每个访问行为的搜索词字段和点击品类字段Row row = iterator.next();if(userid == null) {userid = row.getLong(1);}System.out.println("row.toString"+row.toString());String searchKeyword = row.getString(5);Long clickCategoryId = row.getLong(6);//String clickCategoryId = String.valueOf(row.getLong(6));// 实际上这里要对数据说明一下// 并不是每一行访问行为都有searchKeyword何clickCategoryId两个字段的// 其实,只有搜索行为,是有searchKeyword字段的// 只有点击品类的行为,是有clickCategoryId字段的// 所以,任何一行行为数据,都不可能两个字段都有,所以数据是可能出现null值的// 我们决定是否将搜索词或点击品类id拼接到字符串中去// 首先要满足:不能是null值// 其次,之前的字符串中还没有搜索词或者点击品类idif(StringUtils.isNotEmpty(searchKeyword)) {if(!searchKeywordsBuffer.toString().contains(searchKeyword)) {searchKeywordsBuffer.append(searchKeyword + ",");}}if(clickCategoryId != Long.MAX_VALUE) {if(!clickCategoryIdsBuffer.toString().contains(String.valueOf(clickCategoryId))) {clickCategoryIdsBuffer.append(clickCategoryId + ",");}}// 计算session开始和结束时间Date actionTime = DateUtils.parseTime(row.getString(4));if(startTime == null) {startTime = actionTime;}if(endTime == null) {endTime = actionTime;}if(actionTime.before(startTime)) {startTime = actionTime;}if(actionTime.after(endTime)) {endTime = actionTime;}// 计算session访问步长stepLength++;}String searchKeywords = StringUtils.trimComma(searchKeywordsBuffer.toString());String clickCategoryIds = StringUtils.trimComma(clickCategoryIdsBuffer.toString());// 计算session访问时长(秒)long visitLength = (endTime.getTime() - startTime.getTime()) / 1000;// 大家思考一下// 我们返回的数据格式,即使<sessionid,partAggrInfo>// 但是,这一步聚合完了以后,其实,我们是还需要将每一行数据,跟对应的用户信息进行聚合// 问题就来了,如果是跟用户信息进行聚合的话,那么key,就不应该是sessionid// 就应该是userid,才能够跟<userid,Row>格式的用户信息进行聚合// 如果我们这里直接返回<sessionid,partAggrInfo>,还得再做一次mapToPair算子// 将RDD映射成<userid,partAggrInfo>的格式,那么就多此一举// 所以,我们这里其实可以直接,返回的数据格式,就是<userid,partAggrInfo>// 然后跟用户信息join的时候,将partAggrInfo关联上userInfo// 然后再直接将返回的Tuple的key设置成sessionid// 最后的数据格式,还是<sessionid,fullAggrInfo>// 聚合数据,用什么样的格式进行拼接?// 我们这里统一定义,使用key=value|key=valueString partAggrInfo = Constants.FIELD_SESSION_ID + "=" + sessionid + "|"+ Constants.FIELD_SEARCH_KEYWORDS + "=" + searchKeywords + "|"+ Constants.FIELD_CLICK_CATEGORY_IDS + "=" + clickCategoryIds + "|"+ Constants.FIELD_VISIT_LENGTH + "=" + visitLength + "|"+ Constants.FIELD_STEP_LENGTH + "=" + stepLength +"|"+Constants.FIELD_START_TIME + "=" +DateUtils.formateTime(startTime);//测试//System.out.println("partAggrInfo: "+ partAggrInfo);return new Tuple2<Long, String>(userid, partAggrInfo);}});// 查询所有用户数据,并映射成<userid,Row>的格式String sql = "select * from user_info";JavaRDD<Row> userInfoRDD = sqlContext.sql(sql).javaRDD();JavaPairRDD<Long, Row> userid2InfoRDD = userInfoRDD.mapToPair(new PairFunction<Row, Long, Row>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<Long, Row> call(Row row) throws Exception {return new Tuple2<Long, Row>(row.getLong(0), row);}});// 将session粒度聚合数据,与用户信息进行joinJavaPairRDD<Long, Tuple2<String, Row>> userid2FullInfoRDD =userid2PartAggrInfoRDD.join(userid2InfoRDD);// 对join起来的数据进行拼接,并且返回<sessionid,fullAggrInfo>格式的数据JavaPairRDD<String, String> sessionid2FullAggrInfoRDD = userid2FullInfoRDD.mapToPair(new PairFunction<Tuple2<Long,Tuple2<String,Row>>, String, String>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, String> call(Tuple2<Long, Tuple2<String, Row>> tuple)throws Exception {String partAggrInfo = tuple._2._1;Row userInfoRow = tuple._2._2;String sessionid = StringUtils.getFieldFromConcatString(partAggrInfo, "\\|", Constants.FIELD_SESSION_ID);int age = userInfoRow.getInt(3);String professional = userInfoRow.getString(4);String city = userInfoRow.getString(5);String sex = userInfoRow.getString(6);String fullAggrInfo = partAggrInfo + "|"+ Constants.FIELD_AGE + "=" + age + "|"+ Constants.FIELD_PROFESSIONAL + "=" + professional + "|"+ Constants.FIELD_CITY + "=" + city + "|"+ Constants.FIELD_SEX + "=" + sex;return new Tuple2<String, String>(sessionid, fullAggrInfo);}});return sessionid2FullAggrInfoRDD;}/*** 过滤session数据,并进行聚合统计* @param sessionid2AggrInfoRDD* @return*/private static JavaPairRDD<String, String> filterSessionAndAggrStat(JavaPairRDD<String, String> sessionid2AggrInfoRDD,final JSONObject taskParam,final Accumulator<String> sessionAggrStatAccumulator) {// 为了使用我们后面的ValieUtils,所以,首先将所有的筛选参数拼接成一个连接串// 此外,这里其实大家不要觉得是多此一举// 其实我们是给后面的性能优化埋下了一个伏笔String startAge = ParamUtils.getParam(taskParam, Constants.PARAM_START_AGE);String endAge = ParamUtils.getParam(taskParam, Constants.PARAM_END_AGE);String professionals = ParamUtils.getParam(taskParam, Constants.PARAM_PROFESSIONALS);String cities = ParamUtils.getParam(taskParam, Constants.PARAM_CITIES);String sex = ParamUtils.getParam(taskParam, Constants.PARAM_SEX);String keywords = ParamUtils.getParam(taskParam, Constants.PARAM_KEYWORDS);String categoryIds = ParamUtils.getParam(taskParam, Constants.PARAM_CATEGORY_IDS);String _parameter = (startAge != null ? Constants.PARAM_START_AGE + "=" + startAge + "|" : "")+ (endAge != null ? Constants.PARAM_END_AGE + "=" + endAge + "|" : "")+ (professionals != null ? Constants.PARAM_PROFESSIONALS + "=" + professionals + "|" : "")+ (cities != null ? Constants.PARAM_CITIES + "=" + cities + "|" : "")+ (sex != null ? Constants.PARAM_SEX + "=" + sex + "|" : "")+ (keywords != null ? Constants.PARAM_KEYWORDS + "=" + keywords + "|" : "")+ (categoryIds != null ? Constants.PARAM_CATEGORY_IDS + "=" + categoryIds: "");if(_parameter.endsWith("\\|")) {_parameter = _parameter.substring(0, _parameter.length() - 1);}final String parameter = _parameter;// 根据筛选参数进行过滤JavaPairRDD<String, String> filteredSessionid2AggrInfoRDD = sessionid2AggrInfoRDD.filter(new Function<Tuple2<String,String>, Boolean>() {private static final long serialVersionUID = 1L;@Overridepublic Boolean call(Tuple2<String, String> tuple) throws Exception {// 首先,从tuple中,获取聚合数据String aggrInfo = tuple._2;//System.out.println("Session筛选: "+ aggrInfo);// 接着,依次按照筛选条件进行过滤// 按照年龄范围进行过滤(startAge、endAge)if(!ValidUtils.between(aggrInfo, Constants.FIELD_AGE,parameter, Constants.PARAM_START_AGE, Constants.PARAM_END_AGE)) {return false;}// 按照职业范围进行过滤(professionals)// 互联网,IT,软件// 互联网if(!ValidUtils.in(aggrInfo, Constants.FIELD_PROFESSIONAL,parameter, Constants.PARAM_PROFESSIONALS)) {return false;}// 按照城市范围进行过滤(cities)// 北京,上海,广州,深圳// 成都if(!ValidUtils.in(aggrInfo, Constants.FIELD_CITY,parameter, Constants.PARAM_CITIES)) {return false;}// 按照性别进行过滤// 男/女// 男,女if(!ValidUtils.equal(aggrInfo, Constants.FIELD_SEX,parameter, Constants.PARAM_SEX)) {return false;}// 按照搜索词进行过滤// 我们的session可能搜索了 火锅,蛋糕,烧烤// 我们的筛选条件可能是 火锅,串串香,iphone手机// 那么,in这个校验方法,主要判定session搜索的词中,有任何一个,与筛选条件中// 任何一个搜索词相当,即通过if(!ValidUtils.in(aggrInfo, Constants.FIELD_SEARCH_KEYWORDS,parameter, Constants.PARAM_KEYWORDS)) {return false;}// 按照点击品类id进行过滤if(!ValidUtils.in(aggrInfo, Constants.FIELD_CLICK_CATEGORY_IDS,parameter, Constants.PARAM_CATEGORY_IDS)) {return false;}// 如果经过了之前的多个过滤条件之后,程序能够走到这里// 那么就说明,该session是通过了用户指定的筛选条件的,也就是需要保留的session// 那么就要对session的访问时长和访问步长,进行统计,根据session对应的范围// 进行相应的累加计数// 主要走到这一步,那么就是需要计数的sessionsessionAggrStatAccumulator.add(Constants.SESSION_COUNT);// 计算出session的访问时长和访问步长的范围,并进行相应的累加long visitLength = Long.valueOf(StringUtils.getFieldFromConcatString(aggrInfo, "\\|", Constants.FIELD_VISIT_LENGTH));long stepLength = Long.valueOf(StringUtils.getFieldFromConcatString(aggrInfo, "\\|", Constants.FIELD_STEP_LENGTH));calculateVisitLength(visitLength);calculateStepLength(stepLength);return true;}/*** 计算访问时长范围* @param visitLength*/private void calculateVisitLength(long visitLength) {if(visitLength >=1 && visitLength <= 3) {sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_1s_3s);} else if(visitLength >=4 && visitLength <= 6) {sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_4s_6s);} else if(visitLength >=7 && visitLength <= 9) {sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_7s_9s);} else if(visitLength >=10 && visitLength <= 30) {sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_10s_30s);} else if(visitLength > 30 && visitLength <= 60) {sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_30s_60s);} else if(visitLength > 60 && visitLength <= 180) {sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_1m_3m);} else if(visitLength > 180 && visitLength <= 600) {sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_3m_10m);} else if(visitLength > 600 && visitLength <= 1800) {sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_10m_30m);} else if(visitLength > 1800) {sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_30m);}}/*** 计算访问步长范围* @param stepLength*/private void calculateStepLength(long stepLength) {if(stepLength >= 1 && stepLength <= 3) {sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_1_3);} else if(stepLength >= 4 && stepLength <= 6) {sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_4_6);} else if(stepLength >= 7 && stepLength <= 9) {sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_7_9);} else if(stepLength >= 10 && stepLength <= 30) {sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_10_30);} else if(stepLength > 30 && stepLength <= 60) {sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_30_60);} else if(stepLength > 60) {sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_60);}}});return filteredSessionid2AggrInfoRDD;}/*** 随机抽取session* @param sessionid2AggrInfoRDD*/private static void randomExtractSession(final long taskid,JavaPairRDD<String, String> sessionid2AggrInfoRDD,JavaPairRDD<String,Row> sessionid2actionRDD) {// 第一步,计算出每天每小时的session数量,获取<yyyy-MM-dd_HH,sessionid>格式的RDDJavaPairRDD<String, String> time2sessionidRDD = sessionid2AggrInfoRDD.mapToPair(new PairFunction<Tuple2<String,String>, String, String>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, String> call(Tuple2<String, String> tuple) throws Exception {String aggrInfo = tuple._2;String startTime = StringUtils.getFieldFromConcatString(aggrInfo, "\\|", Constants.FIELD_START_TIME);String dateHour = DateUtils.getDateHour(startTime);return new Tuple2<String, String>(dateHour, aggrInfo);}});/*** 思考一下:这里我们不要着急写大量的代码,做项目的时候,一定要用脑子多思考** 每天每小时的session数量,然后计算出每天每小时的session抽取索引,遍历每天每小时session* 首先抽取出的session的聚合数据,写入session_random_extract表* 所以第一个RDD的value,应该是session聚合数据**/// 得到每天每小时的session数量Map<String, Long> countMap =  time2sessionidRDD.countByKey();//第二步,使用按时间比例随机抽取算法,计算出每天每小时需要抽取session的索引// 将<yyyy-MM-dd_HH,count>格式的map,转换成<yyyy-MM-dd,<HH,count>>的格式Map<String, Map<String, Long>> dateHourCountMap =new HashMap<String, Map<String, Long>>();for(Map.Entry<String, Long> countEntry : countMap.entrySet()) {String dateHour = countEntry.getKey();String date = dateHour.split("_")[0];String hour = dateHour.split("_")[1];System.out.print("dateHourCountMap : "+ dateHour);long count = Long.valueOf(String.valueOf(countEntry.getValue()));Map<String, Long> hourCountMap = dateHourCountMap.get(date);if(hourCountMap == null) {hourCountMap = new HashMap<String, Long>();dateHourCountMap.put(date, hourCountMap);}hourCountMap.put(hour, count);}System.out.print("dateHourCountMapSize : "+ dateHourCountMap.size());// 开始实现我们的按时间比例随机抽取算法// 总共要抽取100个session,先按照天数,进行平分int extractNumberPerDay = 100 / dateHourCountMap.size();//<date,<hour,(1,3,4,2103)>>Map<String,Map<String, List<Integer>>> dateHourExtractMap =new HashMap<String,Map<String,List<Integer>>>();Random random = new Random();for (Map.Entry<String,Map<String,Long>> dateHourCountEntry : dateHourCountMap.entrySet()){String date = dateHourCountEntry.getKey();Map<String,Long> hourCountMap = dateHourCountEntry.getValue();//计算出每天的session总数long sessionCount = 0L;for (long hourCount : hourCountMap.values()){sessionCount += hourCount;}Map<String,List<Integer>> hourExtractMap  = dateHourExtractMap.get(date);if (hourExtractMap == null){hourExtractMap = new HashMap<String,List<Integer>>();dateHourExtractMap.put(date,hourExtractMap);}//遍历每一个小时for (Map.Entry<String,Long> hourCountEntry : hourCountMap.entrySet()){String hour = hourCountEntry.getKey();long count = hourCountEntry.getValue();// 计算每个小时的session数量,占据当天总session数量的比例,直接乘以每天要抽取的数量// 就可以计算出,当前小时需要抽取的session数量int hourExtractNumber = (int)((double)count/(double) sessionCount)*extractNumberPerDay;if (hourExtractNumber > count){hourExtractNumber = (int)count;}//先获取当前小时的存放随机数的listList<Integer> extractIndexList = hourExtractMap.get(hour);if (extractIndexList == null){extractIndexList = new ArrayList<Integer>();hourExtractMap.put(hour,extractIndexList);}//生成上面计算出来的数量的随机数for (int i = 0; i < hourExtractNumber;i++){int extractIndex = random.nextInt((int)count);while (extractIndexList.contains(extractIndex)){extractIndex = random.nextInt((int)count);}extractIndexList.add(extractIndex);}}}/*** 第三步:遍历每天每小时的session,然后根据随机索引进行抽取*/// 执行groupByKey算子,得到<dateHour,(session aggrInfo)>JavaPairRDD<String,Iterable<String>> time2sessionsRDD = time2sessionidRDD.groupByKey();// 我们用flatMap算子,遍历所有的<dateHour,(session aggrInfo)>格式的数据// 然后呢,会遍历每天每小时的session// 如果发现某个session恰巧在我们指定的这天这小时的随机抽取索引上// 那么抽取该session,直接写入MySQL的random_extract_session表// 将抽取出来的session id返回回来,形成一个新的JavaRDD<String>// 然后最后一步,是用抽取出来的sessionid,去join它们的访问行为明细数据,写入session表JavaPairRDD<String, String> extractSessionidsRDD = time2sessionsRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<String,Iterable<String>>, String, String>() {private static final long serialVersionUID = 1L;@Overridepublic Iterator<Tuple2<String, String>> call(Tuple2<String, Iterable<String>> tuple)throws Exception {List<Tuple2<String, String>> extractSessionids =new ArrayList<Tuple2<String, String>>();String dateHour = tuple._1;String date = dateHour.split("_")[0];String hour = dateHour.split("_")[1];Iterator<String> iterator = tuple._2.iterator();List<Integer> extractIndexList = dateHourExtractMap.get(date).get(hour);ISessionRandomExtractDAO sessionRandomExtractDAO =DAOFactory.getSessionRandomExtractDAO();int index = 0;while(iterator.hasNext()) {String sessionAggrInfo = iterator.next();if(extractIndexList.contains(index)) {String sessionid = StringUtils.getFieldFromConcatString(sessionAggrInfo, "\\|", Constants.FIELD_SESSION_ID);// 将数据写入MySQLSessionRandomExtract sessionRandomExtract = new SessionRandomExtract();sessionRandomExtract.setTaskid(taskid);sessionRandomExtract.setSessionid(sessionid);sessionRandomExtract.setStartTime(StringUtils.getFieldFromConcatString(sessionAggrInfo, "\\|", Constants.FIELD_START_TIME));sessionRandomExtract.setSerachKeyWords(StringUtils.getFieldFromConcatString(sessionAggrInfo, "\\|", Constants.FIELD_SEARCH_KEYWORDS));sessionRandomExtract.setClickCategoryIds(StringUtils.getFieldFromConcatString(sessionAggrInfo, "\\|", Constants.FIELD_CLICK_CATEGORY_IDS));sessionRandomExtractDAO.insert(sessionRandomExtract);// 将sessionid加入listextractSessionids.add(new Tuple2<String, String>(sessionid, sessionid));}index++;}return  extractSessionids.iterator();}});/*** 第四步:获取抽取出来的session的明细数据*/JavaPairRDD<String, Tuple2<String, Row>> extractSessionDetailRDD =extractSessionidsRDD.join(sessionid2actionRDD);extractSessionDetailRDD.foreach(new VoidFunction<Tuple2<String,Tuple2<String,Row>>>() {private static final long serialVersionUID = 1L;@Overridepublic void call(Tuple2<String, Tuple2<String, Row>> tuple) throws Exception {Row row = tuple._2._2;SessionDetail sessionDetail = new SessionDetail();sessionDetail.setTaskid(taskid);sessionDetail.setUserid(row.getLong(1));sessionDetail.setSessionid(row.getString(2));sessionDetail.setPageid(row.getLong(3));sessionDetail.setActionTime(row.getString(4));sessionDetail.setSeachKeyWord(row.getString(5));sessionDetail.setClickCategoryId(row.getLong(6));sessionDetail.setClickProductId(row.getLong(7));sessionDetail.setOrderCategoryIds(row.getString(8));sessionDetail.setOrderProductIds(row.getString(9));sessionDetail.setPayCategoryIds(row.getString(10));sessionDetail.setPayProductIds(row.getString(11));ISessionDetailDAO sessionDetailDAO = DAOFactory.getSessionDetailDAO();sessionDetailDAO.insert(sessionDetail);}});}/*** 计算各session范围占比,并写入MySQL* @param value* @param taskId*/private static void calculateAndPersistAggrStat(String value, long taskId) {//从Accumulate统计串中获取值long session_count = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.SESSION_COUNT));long visit_length_1s_3s = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_1s_3s));long visit_length_4s_6s = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_4s_6s));long visit_length_7s_9s = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_7s_9s));long visit_length_10s_30s = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_10s_30s));long visit_length_30s_60s = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_30s_60s));long visit_length_1m_3m = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_1m_3m));long visit_length_3m_10m = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_3m_10m));long visit_length_10m_30m = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_10m_30m));long visit_length_30m = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_30m));long step_length_1_3 = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_1_3));long step_length_4_6 = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_4_6));long step_length_7_9 = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_7_9));long step_length_10_30 = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_10_30));long step_length_30_60 = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_30_60));long step_length_60 = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_60));//计算各个访问时长和步长的占比double visit_length_1s_3s_ratio = NumberUtils.formatDouble((double) visit_length_1s_3s/session_count,2);double visit_length_4s_6s_ratio = NumberUtils.formatDouble((double)visit_length_4s_6s/session_count,2);double visit_length_7s_9s_ratio = NumberUtils.formatDouble((double)visit_length_7s_9s/session_count,2);double visit_length_10s_30s_ratio = NumberUtils.formatDouble((double)visit_length_10s_30s/session_count,2);double visit_length_30s_60s_ratio = NumberUtils.formatDouble((double)visit_length_30s_60s/session_count,2);double visit_length_1m_3m_ratio = NumberUtils.formatDouble((double)visit_length_1m_3m/session_count,2);double visit_length_3m_10m_ratio = NumberUtils.formatDouble((double)visit_length_3m_10m/session_count,2);double visit_length_10m_30m_ratio = NumberUtils.formatDouble((double)visit_length_10m_30m/session_count,2);double visit_length_30m_ratio = NumberUtils.formatDouble((double)visit_length_30m/session_count,2);double step_length_1_3_ratio = NumberUtils.formatDouble((double)step_length_1_3/session_count,2);double step_length_4_6_ratio = NumberUtils.formatDouble((double)step_length_4_6/session_count,2);double step_length_7_9_ratio = NumberUtils.formatDouble((double)step_length_7_9/session_count,2);double step_length_10_30_ratio = NumberUtils.formatDouble((double)step_length_10_30/session_count,2);double step_length_30_60_ratio = NumberUtils.formatDouble((double)step_length_30_60/session_count,2);double step_length_60_ratio = NumberUtils.formatDouble((double)step_length_60/session_count,2);//        将统计封装为Domain对象SessionAggrStat sessionAggrStat = new SessionAggrStat();sessionAggrStat.setSession_count(session_count);sessionAggrStat.setTaskid(taskId);sessionAggrStat.setVisit_length_1s_3s_ratio(visit_length_1s_3s_ratio);sessionAggrStat.setVisit_length_4s_6s_ratio(visit_length_4s_6s_ratio);sessionAggrStat.setVisit_length_7s_9s_ratio(visit_length_7s_9s_ratio);sessionAggrStat.setVisit_length_10s_30s_ratio(visit_length_10s_30s_ratio);sessionAggrStat.setVisit_length_30s_60s_ratio(visit_length_30s_60s_ratio);sessionAggrStat.setVisit_length_1m_3m_ratio(visit_length_1m_3m_ratio);sessionAggrStat.setVisit_length_3m_10m_ratio(visit_length_3m_10m_ratio);sessionAggrStat.setVisit_length_10m_30m_ratio(visit_length_10m_30m_ratio);sessionAggrStat.setVisit_length_30m_ratio(visit_length_30m_ratio);sessionAggrStat.setStep_length_1_3_ratio(step_length_1_3_ratio);sessionAggrStat.setStep_length_4_6_ratio(step_length_4_6_ratio);sessionAggrStat.setStep_length_7_9_ratio(step_length_7_9_ratio);sessionAggrStat.setStep_length_10_30_ratio(step_length_10_30_ratio);sessionAggrStat.setStep_length_30_60_ratio(step_length_30_60_ratio);sessionAggrStat.setStep_length_60_ratio(step_length_60_ratio);ISessionAggrStatDAO sessionAggrStatDAO = DAOFactory.getSessionAggrStatDAO();sessionAggrStatDAO.insert(sessionAggrStat);}/*** 获取Top10的品类* @param filteredSessionid2AggrInfoRDD* @param session2actionRDD*/private static void getTop10Category(long taskid,JavaPairRDD<String, String> filteredSessionid2AggrInfoRDD, JavaPairRDD<String, Row> session2actionRDD) {JavaPairRDD<String,Row> sessionid2detailRDD = filteredSessionid2AggrInfoRDD.join(session2actionRDD).mapToPair(new PairFunction<Tuple2<String, Tuple2<String, Row>>, String, Row>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Row> call(Tuple2<String, Tuple2<String, Row>> tuple) throws Exception {return new Tuple2<String,Row>(tuple._1,tuple._2._2);}});// 获取session访问过的所有品类id// 访问过:指的是,点击过、下单过、支付过的品类JavaPairRDD<Long,Long> categoryidRDD = sessionid2detailRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<String, Row>, Long, Long>() {private static final long serialVersionUID = 1L;@Overridepublic Iterator<Tuple2<Long, Long>> call(Tuple2<String, Row> tuple) throws Exception {Row row = tuple._2;List<Tuple2<Long,Long>> list = new ArrayList<>();Long clickCategoryId = Long.valueOf(row.getLong(6));long maxid = 10240L;if (clickCategoryId != maxid){list.add(new Tuple2<Long,Long>(clickCategoryId,clickCategoryId));}String orderCategoryIds = row.getString(8);if (orderCategoryIds != null){String[] orderCategoryIdsSplited = orderCategoryIds.split(",");for (String orderCategory: orderCategoryIdsSplited){list.add(new Tuple2<Long,Long>(Long.valueOf(orderCategory),Long.valueOf(orderCategory)));}}String payCategoryIds = row.getString(10);if (payCategoryIds != null){String[] payCategoryIdsSplited = payCategoryIds.split(",");for (String payCategoryId : payCategoryIdsSplited){list.add(new Tuple2<Long,Long>(Long.valueOf(payCategoryId),Long.valueOf(payCategoryId)));}}return list.iterator();}});/*** 必须进行去重* 如果不去重,会出现重复的categoryid,排序会对重复的Categoryid的categoryInfo进行排序* 最后很可能拿到重复的数据*/categoryidRDD = categoryidRDD.distinct();/*** 第二步:计算各品类的点击、下单和支付的次数*/// 访问明细中,其中三种访问行为是:点击、下单和支付// 分别来计算各品类点击、下单和支付的次数,可以先对访问明细数据进行过滤// 分别过滤出点击、下单和支付行为,然后通过map、reduceByKey等算子来进行计算// 计算各个品类的点击次数JavaPairRDD<Long, Long> clickCategoryId2CountRDD =getClickCategoryId2CountRDD(sessionid2detailRDD);// 计算各个品类的下单次数JavaPairRDD<Long, Long> orderCategoryId2CountRDD =getOrderCategoryId2CountRDD(sessionid2detailRDD);// 计算各个品类的支付次数JavaPairRDD<Long, Long> payCategoryId2CountRDD =getPayCategoryId2CountRDD(sessionid2detailRDD);/*** 第三步:join各品类与它的点击、下单和支付的次数** categoryidRDD中,是包含了所有的符合条件的session,访问过的品类id** 上面分别计算出来的三份,各品类的点击、下单和支付的次数,可能不是包含所有品类的* 比如,有的品类,就只是被点击过,但是没有人下单和支付** 所以,这里,就不能使用join操作,要使用leftOuterJoin操作,就是说,如果categoryidRDD不能* join到自己的某个数据,比如点击、或下单、或支付次数,那么该categoryidRDD还是要保留下来的* 只不过,没有join到的那个数据,就是0了**/JavaPairRDD<Long, String> categoryid2countRDD = joinCategoryAndData(categoryidRDD, clickCategoryId2CountRDD, orderCategoryId2CountRDD,payCategoryId2CountRDD);/*** 第四步:自定义二次排序key*//*** 第五步:将数据映射成<CategorySortKey,info>格式的RDD,然后进行二次排序(降序)*/JavaPairRDD<CategorySortKey,String> sortKey2countRDD = categoryid2countRDD.mapToPair(new PairFunction<Tuple2<Long, String>, CategorySortKey, String>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<CategorySortKey, String> call(Tuple2<Long, String> tuple) throws Exception {String countInfo = tuple._2;String click = StringUtils.getFieldFromConcatString(countInfo,"\\|",Constants.FIELD_CLICK_COUNT);if (click == null){click = "0";}long clickCount = Long.valueOf(click);String order =   StringUtils.getFieldFromConcatString(countInfo,"\\|",Constants.FIELD_ORDER_COUNT);if (order == null){order = "0";}long orderCount = Long.valueOf(order);String pay = StringUtils.getFieldFromConcatString(countInfo,"\\|",Constants.FIELD_PAY_COUNT);if (pay == null){pay = "0";}long payCount = Long.valueOf(pay);CategorySortKey categorySortKey = new CategorySortKey(clickCount,orderCount,payCount);return new Tuple2<CategorySortKey,String>(categorySortKey,countInfo);}});JavaPairRDD<CategorySortKey,String> sortedCategoryCountRDD = sortKey2countRDD.sortByKey(false);/*** 第六步:用take(10)取出top10热门品类,并写入MySQL*/ITop10CategoryDAO top10CategoryDAO = DAOFactory.getTop10CategoryDAO();List<Tuple2<CategorySortKey,String>> top10CategoryList = sortedCategoryCountRDD.take(10);for (Tuple2<CategorySortKey,String> tuple : top10CategoryList){String countInfo = tuple._2;long categoryid = Long.valueOf(StringUtils.getFieldFromConcatString(countInfo,"\\|", Constants.FIELD_CATEGORY_ID));long clickCount = Long.valueOf(StringUtils.getFieldFromConcatString(countInfo,"\\|", Constants.FIELD_CLICK_COUNT));long orderCount = Long.valueOf(StringUtils.getFieldFromConcatString(countInfo,"\\|", Constants.FIELD_ORDER_COUNT));long payCount = Long.valueOf(StringUtils.getFieldFromConcatString(countInfo,"\\|", Constants.FIELD_PAY_COUNT));Top10Category top10Category = new Top10Category();top10Category.setTaskid(taskid);top10Category.setCategoryid(categoryid);top10Category.setClickCount(clickCount);top10Category.setOrderCount(orderCount);top10Category.setPayCount(payCount);top10CategoryDAO.insert(top10Category);}}/*** 获取个品类点击次数RDD* @param sessionid2detailRDD* @return*/private static JavaPairRDD<Long,Long> getClickCategoryId2CountRDD(JavaPairRDD<String,Row> sessionid2detailRDD) {JavaPairRDD<String,Row> clickActionRDD = sessionid2detailRDD.filter(new Function<Tuple2<String, Row>, Boolean>() {private static final long serialVersionUID = 1L;@Overridepublic Boolean call(Tuple2<String, Row> tuple) throws Exception {Row row = tuple._2;return row.getLong(6) !=Long.MAX_VALUE ? true :false;}});JavaPairRDD<Long,Long> clickCategoryIdRDD = clickActionRDD.mapToPair(new PairFunction<Tuple2<String, Row>, Long, Long>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<Long, Long> call(Tuple2<String, Row> tuple) throws Exception {long clickCategoryId = tuple._2.getLong(6);return new Tuple2<Long,Long>(clickCategoryId,1L);}});JavaPairRDD<Long,Long> clickCategoryId2CountRDD = clickCategoryIdRDD.reduceByKey(new Function2<Long, Long, Long>() {private static final long serialVersionID = 1L;@Overridepublic Long call(Long v1, Long v2) throws Exception {return v1+v2;}});return clickCategoryId2CountRDD;}/*** 获取品类下单次数的RDD* @param sessionid2detailRDD* @return*/private static JavaPairRDD<Long,Long> getOrderCategoryId2CountRDD(JavaPairRDD<String,Row> sessionid2detailRDD){JavaPairRDD<String,Row> orderAction = sessionid2detailRDD.filter(new Function<Tuple2<String, Row>, Boolean>() {private static final long serialVersionUID = 1L;@Overridepublic Boolean call(Tuple2<String, Row> tuple) throws Exception {Row row = tuple._2;return row.getString(8) != null ? true:false;}});JavaPairRDD<Long,Long> orderCategoryIdRDD = orderAction.flatMapToPair(new PairFlatMapFunction<Tuple2<String, Row>, Long, Long>() {private static final long serialVersionUID = 1L;@Overridepublic Iterator<Tuple2<Long, Long>> call(Tuple2<String, Row> tuple) throws Exception {Row row = tuple._2;String orderCategoryIds = row.getString(8);String[] orderCategoryIdsSplited = orderCategoryIds.split(",");List<Tuple2<Long,Long>> list = new ArrayList<Tuple2<Long, Long>>();for (String orderCategoryid: orderCategoryIdsSplited){list.add(new Tuple2<Long,Long>(Long.valueOf(orderCategoryid),1L));}return list.iterator();}});JavaPairRDD<Long,Long> orderCategoryId2CountRDD = orderCategoryIdRDD.reduceByKey(new Function2<Long, Long, Long>() {private static final long serialVersionID = 1L;@Overridepublic Long call(Long v1, Long v2) throws Exception {return v1+v2;}});return orderCategoryId2CountRDD;}/*** 获取品类支付次数的RDD* @param sessionid2detailRDD* @return*/private static JavaPairRDD<Long,Long> getPayCategoryId2CountRDD(JavaPairRDD<String,Row> sessionid2detailRDD){JavaPairRDD<String,Row> payAction = sessionid2detailRDD.filter(new Function<Tuple2<String, Row>, Boolean>() {private static final long serialVersionUID = 1L;@Overridepublic Boolean call(Tuple2<String, Row> tuple) throws Exception {Row row = tuple._2;return row.getString(10) != null ? true:false;}});JavaPairRDD<Long,Long> payCategoryIdRDD = payAction.flatMapToPair(new PairFlatMapFunction<Tuple2<String, Row>, Long, Long>() {private static final long serialVersionUID = 1L;@Overridepublic Iterator<Tuple2<Long, Long>> call(Tuple2<String, Row> tuple) throws Exception {Row row = tuple._2;String payCategoryIds = row.getString(10);// System.out.println("payCategoryIdRDD: "+ payCategoryIds);String[] payCategoryIdsSplited = payCategoryIds.split(",");List<Tuple2<Long,Long>> list = new ArrayList<Tuple2<Long, Long>>();for (String payCategoryid: payCategoryIdsSplited){list.add(new Tuple2<Long,Long>(Long.valueOf(payCategoryid),1L));}return list.iterator();}});JavaPairRDD<Long,Long> payCategoryId2CountRDD = payCategoryIdRDD.reduceByKey(new Function2<Long, Long, Long>() {private static final long serialVersionID = 1L;@Overridepublic Long call(Long v1, Long v2) throws Exception {return v1+v2;}});return payCategoryId2CountRDD;}/*** 连接品类与数据的RDD* @param categoryidRDD* @param clickCategoryId2CountRDD* @param orderCategoryId2CountRDD* @param payCategoryId2CountRDD* @return*/private static JavaPairRDD<Long,String> joinCategoryAndData(JavaPairRDD<Long,Long> categoryidRDD,JavaPairRDD<Long,Long> clickCategoryId2CountRDD,JavaPairRDD<Long,Long> orderCategoryId2CountRDD,JavaPairRDD<Long,Long> payCategoryId2CountRDD){// 解释一下,如果用leftOuterJoin,就可能出现,右边那个RDD中,join过来时,没有值// 所以Tuple中的第二个值用Optional<Long>类型,就代表,可能有值,可能没有值//JavaPairRDD<Long,Tuple2<Long,Optional<Long>>> tmpJoinRDD =categoryidRDD.leftOuterJoin(clickCategoryId2CountRDD);JavaPairRDD<Long, Tuple2<Long, Optional<Long>>> tmpJoinRDD =categoryidRDD.leftOuterJoin(clickCategoryId2CountRDD);JavaPairRDD<Long,String> tmpMapRDD= tmpJoinRDD.mapToPair(new PairFunction<Tuple2<Long, Tuple2<Long, Optional<Long>>>, Long, String>() {private static final long verialVersionUID = 1L;@Overridepublic Tuple2<Long, String> call(Tuple2<Long, Tuple2<Long, Optional<Long>>> tuple) throws Exception {long categoryId = tuple._1;Optional<Long> optional = tuple._2._2;long clickCount = 0L;if (optional.isPresent()){clickCount = optional.get();}String value = Constants.FIELD_CATEGORY_ID +"=" +categoryId+"|"+Constants.FIELD_CLICK_COUNT+"="+clickCount;return new Tuple2<Long,String>(categoryId,value);}});tmpMapRDD = tmpMapRDD.leftOuterJoin(orderCategoryId2CountRDD).mapToPair(new PairFunction<Tuple2<Long,Tuple2<String,Optional<Long>>>, Long, String>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<Long, String> call(Tuple2<Long, Tuple2<String, Optional<Long>>> tuple)throws Exception {long categoryid = tuple._1;String value = tuple._2._1;Optional<Long> optional = tuple._2._2;long orderCount = 0L;if(optional.isPresent()) {orderCount = optional.get();}value = value + "|" + Constants.FIELD_ORDER_COUNT + "=" + orderCount;return new Tuple2<Long, String>(categoryid, value);}});tmpMapRDD = tmpMapRDD.leftOuterJoin(payCategoryId2CountRDD).mapToPair(new PairFunction<Tuple2<Long,Tuple2<String,Optional<Long>>>, Long, String>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<Long, String> call(Tuple2<Long, Tuple2<String, Optional<Long>>> tuple)throws Exception {long categoryid = tuple._1;String value = tuple._2._1;Optional<Long> optional = tuple._2._2;long payCount = 0L;if(optional.isPresent()) {payCount = optional.get();}value = value + "|" + Constants.FIELD_PAY_COUNT + "=" + payCount;return new Tuple2<Long, String>(categoryid, value);}});return tmpMapRDD;}}

50.Spark大型电商项目-用户访问session分析-top10热门品类之本地测试相关推荐

  1. 43.Spark大型电商项目-用户访问session分析-top10热门品类之需求回顾以及实现思路分析

    目录 需求回顾 top10热门品类 二次排序 实现思路分析 本篇文章将记录用户访问session分析-top10热门品类之需求回顾以及实现思路分析. 需求回顾 top10热门品类 计算出来通过筛选条件 ...

  2. 112.Spark大型电商项目-广告点击流量实时统计-需求分析、技术方案设计以及数据设计

    目录 需求分析 技术方案设计 数据表设计 ad_user_click_count //用户点击广告表 ad_blacklist //用户黑名单 ad_stat  //广告状态表 ad_province ...

  3. 114.Spark大型电商项目-广告点击流量实时统计-使用高性能方式将实时计算结果写入MySQL中

    目录 误区 Spark Streaming foreachRDD的正确使用方式 对于这种实时计算程序的mysql插入,有两种pattern(模式) 代码 AdUserClickCount.java I ...

  4. 99.Spark大型电商项目-各区域热门商品统计-模块介绍

    目录 各区域热门商品统计 作业提交 大数据方向的职业发展规划 用户行为分析意义 本篇文章记录各区域热门商品统计-模块介绍. 各区域热门商品统计 需求:根据用户指定的日期范围,统计各个区域下的最热门的t ...

  5. 大型电商项目3.0实战+支付宝、微信支付项目实战

    须知:视频来源网络,侵权请联系删除! 大型电商项目3.0实战 获取方式 扫描下面二维码回复:A110 支付宝.微信支付项目实战 获取方式 扫描下面二维码回复:A106

  6. 电商项目需求整理和分析

    项目介绍 本项目命名为快购商城,分为两个端:用户端和管理端. 前台商城系统(用户端):包含首页门户.商品分类.新品上线.首页轮播.商品推荐.商品搜索.商品展示.购物车.订单结算.订单流程.提醒发货.订 ...

  7. 电商领域用户的留存分析

    在分析电商领域的过程中,数据分析师们常常会针对用户的留存情况予以分析说明,以此来分析用户对公司产品的粘性度等. 广告转化.购买.会员付费,我们都需要用户来完成.那么持续的广告转化.购买.会员付费,我们 ...

  8. 1. Python_Django项目之大型电商项目介绍

    1.开发项目目的 联系已掌握的知识点 发现新的知识点 掌握开发技巧 掌握项目结构 增加项目经验 2.所用技术 语言:Python3(Django4) 数据库:MySQL web服务器:Nginx+uw ...

  9. 5.大型电商项目之创建前端展示模板并调用

    1. templates前端模板的使用 1.1 templates前端模板的创建 首先,我们页面很多地方是相似的,这里就创建一个基础模板,不同的地方,对模板内容的block进行修改即可:对于相同的地方 ...

最新文章

  1. vs2017数据可视化建模_介绍数据可视化社区调查2017
  2. java selenium验证元素是否存在
  3. html设置了标签但是定义不了,在HTML标签管理器中设置不带元素ID的HTML中的事件...
  4. Tricks(五)—— Python 返回所有符合条件的下标
  5. 海洋CMS v6.53 v6.54命令执行
  6. 面试题之TCP与UDP的区别
  7. 【iqiqiya专版】CCTV(央视网)视频解析工具V1.0
  8. vue v-if未生效问题
  9. java开发软件怎么安装不了_java开发软件的安装
  10. (最完美)红米Note 5A的usb调试模式在哪里打开的步骤
  11. CyclicBarrier栅栏
  12. Linux快捷键大全(参考)
  13. 微型计算机结构框图,微型计算机系统结构图.doc
  14. 使用canvas画一个流星动画送给她吧
  15. 尤里先生查看陌生人朋友圈教程_微信怎么看陌生人朋友圈 强看陌生人朋友圈教程...
  16. npf拒绝访问的问题
  17. UVM中的TLM通信
  18. 如何实现OpenStack STT隧道(by quqi99)
  19. Eclipse 的常用快捷键
  20. ORACLE GOLDENGATE报错问题汇总

热门文章

  1. 简单介绍下C/S与B/S架构的异同
  2. Linux ALSA声卡驱动之二:Platform
  3. 基于CNN和序列标注的对联机器人 | 附数据集 开源代码
  4. java是什么?java能用来干嘛?
  5. java开发工程师报名费多少_java开发工程师薪水有多少
  6. 大争之世智造为基,瑞科智能将亮相第21届SIMM深圳机械展
  7. 解决MAC系统字体模糊发虚,并更换默认中文字体为微软雅黑
  8. 给定一个含n(n≥1)个整数的数组,请设计一个在时间上尽可能高效的算法,找出数组中未出现的最小正整数。
  9. [概率练习]n个小球放入m个盒子
  10. 安徽大学c语言作业题库,安徽大学C语言考试试卷