每日top3热点搜索词统计案例
数据格式:
日期,用户,搜索词,平台,版本
需求:
1、筛选出符合条件(城市,平台,版本)的数据
2、统计每天搜索uv排名前三的搜索词
3、按照每天的top3搜索词的uv搜索总次数,倒叙排序
4、将数据保存到hive表中
思路分析
1、针对原始数据(HDFS文件),获取输入RDD
2、使用filter算法,针对输入RDD中的数据,进行数据过滤,过滤出符合条件的数据
2.1普通的算法:直接在filter算法函数中,使用外部的查询条件(map),但是,这样做的话,是不是查询条件map,会发送到每一个task上一份副本,(性能不好)
2.2优化后的做法,将查询条件,封装为broadCast广播变量,在filter算法中使用broadCast广播变量。
3、将数据转换为(日期_搜索词,用户)格式,对他进行分组。然后再次进行映射,对每天每个搜索词的搜索用户进行去重操作,并统计去重后的数据,即为每天每个搜搜词的uv,最后获得(日期_搜搜词,uv)。
4、将得到的每天每个搜索词的uvRDD,映射为元素类型的RowRDD,转换为DataFrame。
5、注册为临时表,使用SparkSQL的开窗函数,统计每天的uv数量排名前三名的搜索词,以及他的搜索nv,最后获得一个DataFrame
6、将DataFrame转换为RDD,继续操作,按照每天日期来分组,并进行映射,计算出每天的top3搜索词的uv的总数,然后将uv总数作为key,将每天的top3搜索词以及搜索次数,拼接为一个字符串
7、按照每天的top3搜索总uv,进行排序,倒序排序
8、将排好的数据,再次映射回来,变成 日期_搜索词_uv的格式
9、再次映射为DataFrame,并将数据保存到hive中。
文本:
2018-10-1:leo:water:beijing:android:1.0
2018-10-1:leo1:water:beijing:android:1.0
2018-10-1:leo2:water:beijing:android:1.0
2018-10-1:jack:water:beijing:android:1.0
2018-10-1:jack1:water:beijing:android:1.0
2018-10-1:leo:seafood:beijing:android:1.0
2018-10-1:leo1:seafood:beijing:android:1.0
2018-10-1:leo2:seafood:beijing:android:1.0
2018-10-1:leo:food:beijing:android:1.0
2018-10-1:leo1:food:beijing:android:1.0
2018-10-1:leo2:meat:beijing:android:1.0
2018-10-2:leo:water:beijing:android:1.0
2018-10-2:leo1:water:beijing:android:1.0
2018-10-2:leo2:water:beijing:android:1.0
2018-10-2:jack:water:beijing:android:1.0
2018-10-2:leo1:seafood:beijing:android:1.0
2018-10-2:leo2:seafood:beijing:android:1.0
2018-10-2:leo3:seafood:beijing:android:1.0
2018-10-2:leo1:food:beijing:android:1.0
2018-10-2:leo2:food:beijing:android:1.0
2018-10-2:leo:meat:beijing:android:1.0
代码:
package com.bynear.spark_sql; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.hive.HiveContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import scala.Tuple2; import java.util.*; public class DailyTop3Keyword {public static void main(String[] args) {SparkConf conf = new SparkConf(); JavaSparkContext jsc = new JavaSparkContext(conf); SQLContext sqlContext = new HiveContext(jsc.sc()); // 伪造数据(这些数据可以来自mysql数据库) final HashMap<String, List<String>> queryParaMap = new HashMap<String, List<String>>(); queryParaMap.put("city", Arrays.asList("beijing")); queryParaMap.put("platform", Arrays.asList("android")); queryParaMap.put("version", Arrays.asList("1.0", "1.2", "2.0", "1.5")); // 将数据进行广播 final Broadcast<HashMap<String, List<String>>> queryParamMapBroadcast = jsc.broadcast(queryParaMap); // 读取文本 JavaRDD<String> rowRDD = jsc.textFile("hdfs://Spark01:9000/zjs/daily.txt"); // filter算子进行过滤 JavaRDD<String> filterRDD = rowRDD.filter(new Function<String, Boolean>() {@Override public Boolean call(String log) throws Exception {String[] logSplit = log.split(":"); String city = logSplit[3]; String platform = logSplit[4]; String version = logSplit[5]; HashMap<String, List<String>> queryParamMap = queryParamMapBroadcast.value(); List<String> cities = queryParamMap.get("city"); if (!cities.contains(city) && cities.size() > 0) {return false; }List<String> platforms = queryParamMap.get("platform"); if (!platforms.contains(platform)) {return false; }List<String> versions = queryParamMap.get("version"); if (!versions.contains(version)) {return false; }return true; }}); // 过滤出来的原始日志,映射为(日期_搜索词,用户)格式 JavaPairRDD<String, String> dateKeyWordUserRDD = filterRDD.mapToPair(new PairFunction<String, String, String>() {@Override public Tuple2<String, String> call(String log) throws Exception {String[] logSplit = log.split(":"); String date = logSplit[0]; String user = logSplit[1]; String keyword = logSplit[2]; return new Tuple2<String, String>(date + "_" + keyword, user); }}); // 进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重) JavaPairRDD<String, Iterable<String>> dateKeywordUsersRDD = dateKeyWordUserRDD.groupByKey(); List<Tuple2<String, Iterable<String>>> collect1 = dateKeywordUsersRDD.collect(); for (Tuple2<String, Iterable<String>> tuple2 : collect1) {System.out.println("进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)" + tuple2._2); System.out.println(tuple2); }// 对每天每个搜索词的搜索用户 去重操作 获得前uv JavaPairRDD<String, Long> dateKeywordUvRDD = dateKeywordUsersRDD.mapToPair(new PairFunction<Tuple2<String, Iterable<String>>, String, Long>() {@Override public Tuple2<String, Long> call(Tuple2<String, Iterable<String>> dataKeywordUsers) throws Exception {String dateKeyword = dataKeywordUsers._1; Iterator<String> users = dataKeywordUsers._2.iterator(); // 去重 并统计去重后的数量 List<String> distinctUsers = new ArrayList<String>(); while (users.hasNext()) {String user = users.next(); if (!distinctUsers.contains(user)) {distinctUsers.add(user); }} // 获取uv long uv = distinctUsers.size(); // 日期_搜索词,用户个数 return new Tuple2<String, Long>(dateKeyword, uv); }}); List<Tuple2<String, Long>> collect2 = dateKeywordUvRDD.collect(); for (Tuple2<String, Long> stringLongTuple2 : collect2) {System.out.println("对每天每个搜索词的搜索用户 去重操作 获得前uv"); System.out.println(stringLongTuple2); }// 将每天每个搜索词的uv数据,转换成DataFrame JavaRDD<Row> dateKeywordUvRowRDD = dateKeywordUvRDD.map(new Function<Tuple2<String, Long>, Row>() {@Override public Row call(Tuple2<String, Long> dateKeywordUv) throws Exception {String date = dateKeywordUv._1.split("_")[0]; String keyword = dateKeywordUv._1.split("_")[1]; long uv = dateKeywordUv._2; return RowFactory.create(date, keyword, uv); }}); ArrayList<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("date", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("keyword", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("uv", DataTypes.LongType, true)); StructType structType = DataTypes.createStructType(fields); DataFrame dateKeywordUvDF = sqlContext.createDataFrame(dateKeywordUvRowRDD, structType); dateKeywordUvDF.registerTempTable("sales"); // 使用开窗函数,统计每天搜索uv排名前三的热点搜索词 // 日期 搜索词 人数个数 前三名 final DataFrame dailyTop3KeyWordDF = sqlContext.sql("select date,keyword,uv from (select date, keyword, uv, row_number() over (partition by date order by uv DESC ) rank from sales ) tmp_sales where rank <=3"); // 将DataFrame转换为RDD, 映射, JavaRDD<Row> dailyTop3KeyWordRDD = dailyTop3KeyWordDF.javaRDD(); JavaPairRDD<String, String> dailyTop3KeywordRDD = dailyTop3KeyWordRDD.mapToPair(new PairFunction<Row, String, String>() {@Override public Tuple2<String, String> call(Row row) throws Exception {String date = String.valueOf(row.get(0)); String keyword = String.valueOf(row.get(1)); String uv = String.valueOf(row.get(2)); // 映射为 日期 搜索词_总个数 return new Tuple2<String, String>(date, keyword + "_" + uv); }}); List<Tuple2<String, String>> collect = dailyTop3KeywordRDD.collect(); for (Tuple2<String, String> stringStringTuple2 : collect) {System.out.println("开窗函数操作"); System.out.println(stringStringTuple2); }// 根据 日期分组 JavaPairRDD<String, Iterable<String>> top3DateKeywordsRDD = dailyTop3KeywordRDD.groupByKey(); // 进行映射 JavaPairRDD<Long, String> uvDateKeywordsRDD = top3DateKeywordsRDD.mapToPair(new PairFunction<Tuple2<String, Iterable<String>>, Long, String>() {@Override public Tuple2<Long, String> call(Tuple2<String, Iterable<String>> tuple) throws Exception {String date = tuple._1; // 搜索词_总个数 集合 Iterator<String> KeyWordUviterator = tuple._2.iterator(); long totalUv = 0L; String dateKeyword = date; while (KeyWordUviterator.hasNext()) { // 搜索词_个数 String keywoarUv = KeyWordUviterator.next(); Long uv = Long.valueOf(keywoarUv.split("_")[1]); totalUv += uv; dateKeyword = dateKeyword + "," + keywoarUv; }return new Tuple2<Long, String>(totalUv, dateKeyword); }}); JavaPairRDD<Long, String> sortedUvDateKeywordsRDD = uvDateKeywordsRDD.sortByKey(false); List<Tuple2<Long, String>> rows = sortedUvDateKeywordsRDD.collect(); for (Tuple2<Long, String> row : rows) {System.out.println(row._2 + " " + row._1); }// 映射 JavaRDD<Row> resultRDD = sortedUvDateKeywordsRDD.flatMap(new FlatMapFunction<Tuple2<Long, String>, Row>() {@Override public Iterable<Row> call(Tuple2<Long, String> tuple) throws Exception {String dateKeywords = tuple._2; String[] dateKeywordsSplit = dateKeywords.split(","); String date = dateKeywordsSplit[0]; ArrayList<Row> rows = new ArrayList<Row>(); rows.add(RowFactory.create(date, dateKeywordsSplit[1].split("_")[0], Long.valueOf(dateKeywordsSplit[1].split("_")[1]))); rows.add(RowFactory.create(date, dateKeywordsSplit[2].split("_")[0], Long.valueOf(dateKeywordsSplit[2].split("_")[1]))); rows.add(RowFactory.create(date, dateKeywordsSplit[3].split("_")[0], Long.valueOf(dateKeywordsSplit[3].split("_")[1]))); return rows; }}); DataFrame finalDF = sqlContext.createDataFrame(resultRDD, structType); List<Row> rows1 = finalDF.javaRDD().collect(); for (Row row : rows1) {System.out.println(row); }jsc.stop(); } }
注意点:
1、如果文本案例使用的是txt编辑,将文本保存ANSI格式,否则在groupByKey的时候,第一行默认会出现一个空格,分组失败。最开始使用的是UTF-8格式
2、文本的最后禁止出现空行,否则在split的时候会报错,出现数组越界的错误。
3、使用到窗口函数的时候,必须使用到HiveContext方法,HiveContext使用到的是SparkContext,使用使用jsc.sc()
运行结果:
进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)[leo]
(2018-10-2_meat,[leo])
进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)[leo, leo1, leo2, jack]
(2018-10-2_water,[leo, leo1, leo2, jack])
进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)[leo, leo1]
(2018-10-1_food,[leo, leo1])
进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)[leo1, leo2, leo3]
(2018-10-2_seafood,[leo1, leo2, leo3])
进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)[leo, leo1, leo2, jack, jack1]
(2018-10-1_water,[leo, leo1, leo2, jack, jack1])
进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)[leo2]
(2018-10-1_meat,[leo2])
进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)[leo, leo1, leo2]
(2018-10-1_seafood,[leo, leo1, leo2])
进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)[leo1, leo2]
(2018-10-2_food,[leo1, leo2])
对每天每个搜索词的搜索用户 去重操作 获得前uv
(2018-10-2_meat,1)
对每天每个搜索词的搜索用户 去重操作 获得前uv
(2018-10-2_water,4)
对每天每个搜索词的搜索用户 去重操作 获得前uv
(2018-10-1_food,2)
对每天每个搜索词的搜索用户 去重操作 获得前uv
(2018-10-2_seafood,3)
对每天每个搜索词的搜索用户 去重操作 获得前uv
(2018-10-1_water,5)
对每天每个搜索词的搜索用户 去重操作 获得前uv
(2018-10-1_meat,1)
对每天每个搜索词的搜索用户 去重操作 获得前uv
(2018-10-1_seafood,3)
对每天每个搜索词的搜索用户 去重操作 获得前uv
(2018-10-2_food,2)
窗函数操作
(2018-10-1,water_5)
开窗函数操作
(2018-10-1,seafood_3)
开窗函数操作
(2018-10-1,food_2)
开窗函数操作
(2018-10-2,water_4)
开窗函数操作
(2018-10-2,seafood_3)
开窗函数操作
(2018-10-2,food_2)
最终结果
[2018-10-1,water,5]
[2018-10-1,seafood,3]
[2018-10-1,food,2]
[2018-10-2,water,4]
[2018-10-2,seafood,3]
[2018-10-2,food,2]
每日top3热点搜索词统计案例相关推荐
- 通过Spark Streaming的window操作实战模拟热点搜索词案例实战
本博文主要内容包括: 1.在线热点搜索词实现解析 2.SparkStreaming 利用reduceByKeyAndWindow实现在线热点搜索词实战 一:在线热点搜索词实现解析 背景描述:在社交网络 ...
- reduceByKeyAndWindow基于滑动窗口的热点搜索词实时统计(Scala版本)
package SparkStreaming import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, ...
- 拼多多搜索词统计 API接口操作展示说明
通过次接口 item_search_data可以获取搜索词统计,同时了解到该店铺的(num_iid商品ID.title商品标题.price价格.orginal_price原价.nick卖家昵称.num ...
- 拼多多API—获取商品详情、关键词获取取商品、获得搜索词统计获得搜索词推荐
1.item_get - 根据ID取商品详情: 返回数据部分截图: 点击免费测试 2.item_search - 根据关键词取商品列表: 返回数据部分截图: 3.item_search_data - ...
- spark 项目应用-topn搜索热词统计
本文章通过spark读取hive数据,分析top热点搜索词 import java.util.ArrayList; import java.util.Arrays; import java.util. ...
- 《Spark SQL大数据实例开发》9.2 综合案例实战——电商网站搜索排名统计
<Spark SQL大数据实例开发>9.2 综合案例实战--电商网站搜索排名统计 9.2.1 案例概述 本节演示一个网站搜索综合案例:以京东为例,用户登录京东网站,在搜索栏中输入搜 ...
- 73、Spark SQL之开窗函数以及top3销售额统计案例实战
开窗函数以及top3销售额统计案例实战 Spark 1.4.x版本以后,为Spark SQL和DataFrame引入了开窗函数,比如最经典,最常用的,row_number(),可以让我们实现分组取to ...
- 百度统计出现不属于自己网站的域名搜索词及数据怎么处理
百度统计出现不属于自己网站的域名搜索词及数据怎么处理? 这种情况多半是你网站的百度统计代码别别人利用了,建议你找程序将百度统计代码加密传输这是一种解决方式 另一种设置网站黑白名单,进入百度统计管理界面 ...
- 百度统计搜索词是广告怎么办?建议设置过滤规则排除
最近经常在百度统计中看到有很多垃圾广告的搜索词,甚至连 Top 搜索词.Top 来源网站.Top 入口页面中都出现很多垃圾广告.比如老古的一个测试网站就出现这种情况,具体如下图所示: 据了解,网络上有 ...
最新文章
- 牛客网_剑指Offer_Python实现_更新中
- Matlab创建特殊数组学习
- python opencv 等比例调整(缩放)图片分辨率大小代码 cv2.resize()
- python 为什么要用astype()函数对numpy数据类型进行转换,而不直接指定其dtype?float(64) float(32) int(64) int(32)(转换为整型int)
- 广搜--(搜索的第一道题)图像有用区域
- rsync推拉模型及结合inotify实现推模型自动同步
- 普里姆从不同顶点出发_来自三个不同聚类分析的三个不同教训数据科学的顶点...
- leetcode49. 字母异位词分组
- 【kafka】kafka Producer Metadata概述及源码分析
- Alibaba Druid 源码阅读(一) 数据库连接池初步
- 关于RestTemplate的小笔记
- 可行性分析报告 模板
- 还在用 ZXing ? 试试华为统一扫码服务吧!
- 将Excel数据导入到MySQL数据库
- python导入包总是失败
- android9.0自动使用24小时制
- 安卓手机拨号键盘隐藏工程代码大全
- 在ArcGIS中快速搭建三维场景教程(从数据获取到软件制作)
- python三维图能画地图_使用Python绘制地图的三大秘密武器
- 计算机组成原理 CPU 结构和功能
热门文章
- vscode python 自动补全_利用CodeBERT,这个VS Code扩展可以自动生成Python文档字符串...
- Eclipse下Tomcat服务器配置和使用
- P4721 【模板】分治 FFT
- 【雅礼集训2017】字符串【后缀自动机】【数据分治】
- 【CF1189D】Add on a Tree【结论】【构造】
- 牛客题霸 [滑动窗口的最大值] C++题解/答案
- CF1472(div3):总结
- P5371-[SNOI2019]纸牌【矩阵乘法】
- P4100-[HEOI2013]钙铁锌硒维生素【矩阵求逆,最大匹配】
- nssl1249-C【数论】