数据格式:

日期,用户,搜索词,平台,版本

需求:

1、筛选出符合条件(城市,平台,版本)的数据

2、统计每天搜索uv排名前三的搜索词

3、按照每天的top3搜索词的uv搜索总次数,倒叙排序

4、将数据保存到hive表中

思路分析

1、针对原始数据(HDFS文件),获取输入RDD

2、使用filter算法,针对输入RDD中的数据,进行数据过滤,过滤出符合条件的数据

2.1普通的算法:直接在filter算法函数中,使用外部的查询条件(map),但是,这样做的话,是不是查询条件map,会发送到每一个task上一份副本,(性能不好)

2.2优化后的做法,将查询条件,封装为broadCast广播变量,在filter算法中使用broadCast广播变量。

3、将数据转换为(日期_搜索词,用户)格式,对他进行分组。然后再次进行映射,对每天每个搜索词的搜索用户进行去重操作,并统计去重后的数据,即为每天每个搜搜词的uv,最后获得(日期_搜搜词,uv)。

4、将得到的每天每个搜索词的uvRDD,映射为元素类型的RowRDD,转换为DataFrame。

5、注册为临时表,使用SparkSQL的开窗函数,统计每天的uv数量排名前三名的搜索词,以及他的搜索nv,最后获得一个DataFrame

6、将DataFrame转换为RDD,继续操作,按照每天日期来分组,并进行映射,计算出每天的top3搜索词的uv的总数,然后将uv总数作为key,将每天的top3搜索词以及搜索次数,拼接为一个字符串

7、按照每天的top3搜索总uv,进行排序,倒序排序

8、将排好的数据,再次映射回来,变成 日期_搜索词_uv的格式

9、再次映射为DataFrame,并将数据保存到hive中。

文本:

2018-10-1:leo:water:beijing:android:1.0
2018-10-1:leo1:water:beijing:android:1.0
2018-10-1:leo2:water:beijing:android:1.0
2018-10-1:jack:water:beijing:android:1.0
2018-10-1:jack1:water:beijing:android:1.0
2018-10-1:leo:seafood:beijing:android:1.0
2018-10-1:leo1:seafood:beijing:android:1.0
2018-10-1:leo2:seafood:beijing:android:1.0
2018-10-1:leo:food:beijing:android:1.0
2018-10-1:leo1:food:beijing:android:1.0
2018-10-1:leo2:meat:beijing:android:1.0
2018-10-2:leo:water:beijing:android:1.0
2018-10-2:leo1:water:beijing:android:1.0
2018-10-2:leo2:water:beijing:android:1.0
2018-10-2:jack:water:beijing:android:1.0
2018-10-2:leo1:seafood:beijing:android:1.0
2018-10-2:leo2:seafood:beijing:android:1.0
2018-10-2:leo3:seafood:beijing:android:1.0
2018-10-2:leo1:food:beijing:android:1.0
2018-10-2:leo2:food:beijing:android:1.0
2018-10-2:leo:meat:beijing:android:1.0

代码:

package com.bynear.spark_sql;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;

import java.util.*;

public class DailyTop3Keyword {public static void main(String[] args) {SparkConf conf = new SparkConf();
        JavaSparkContext jsc = new JavaSparkContext(conf);
        SQLContext sqlContext = new HiveContext(jsc.sc());
//        伪造数据(这些数据可以来自mysql数据库)
        final HashMap<String, List<String>> queryParaMap = new HashMap<String, List<String>>();
        queryParaMap.put("city", Arrays.asList("beijing"));
        queryParaMap.put("platform", Arrays.asList("android"));
        queryParaMap.put("version", Arrays.asList("1.0", "1.2", "2.0", "1.5"));
//      将数据进行广播
        final Broadcast<HashMap<String, List<String>>> queryParamMapBroadcast = jsc.broadcast(queryParaMap);
//      读取文本
        JavaRDD<String> rowRDD = jsc.textFile("hdfs://Spark01:9000/zjs/daily.txt");
//      filter算子进行过滤
        JavaRDD<String> filterRDD = rowRDD.filter(new Function<String, Boolean>() {@Override
            public Boolean call(String log) throws Exception {String[] logSplit = log.split(":");
                String city = logSplit[3];
                String platform = logSplit[4];
                String version = logSplit[5];
                HashMap<String, List<String>> queryParamMap = queryParamMapBroadcast.value();
                List<String> cities = queryParamMap.get("city");
                if (!cities.contains(city) && cities.size() > 0) {return false;
                }List<String> platforms = queryParamMap.get("platform");
                if (!platforms.contains(platform)) {return false;
                }List<String> versions = queryParamMap.get("version");
                if (!versions.contains(version)) {return false;
                }return true;
            }});
//        过滤出来的原始日志,映射为(日期_搜索词,用户)格式
        JavaPairRDD<String, String> dateKeyWordUserRDD = filterRDD.mapToPair(new PairFunction<String, String, String>() {@Override
            public Tuple2<String, String> call(String log) throws Exception {String[] logSplit = log.split(":");
                String date = logSplit[0];
                String user = logSplit[1];
                String keyword = logSplit[2];
                return new Tuple2<String, String>(date + "_" + keyword, user);
            }});
//        进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)
        JavaPairRDD<String, Iterable<String>> dateKeywordUsersRDD = dateKeyWordUserRDD.groupByKey();
        List<Tuple2<String, Iterable<String>>> collect1 = dateKeywordUsersRDD.collect();
        for (Tuple2<String, Iterable<String>> tuple2 : collect1) {System.out.println("进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)" + tuple2._2);
            System.out.println(tuple2);
        }//        对每天每个搜索词的搜索用户  去重操作  获得前uv
        JavaPairRDD<String, Long> dateKeywordUvRDD = dateKeywordUsersRDD.mapToPair(new PairFunction<Tuple2<String, Iterable<String>>, String, Long>() {@Override
                    public Tuple2<String, Long> call(Tuple2<String, Iterable<String>> dataKeywordUsers) throws Exception {String dateKeyword = dataKeywordUsers._1;
                        Iterator<String> users = dataKeywordUsers._2.iterator();
//                去重   并统计去重后的数量
                        List<String> distinctUsers = new ArrayList<String>();
                        while (users.hasNext()) {String user = users.next();
                            if (!distinctUsers.contains(user)) {distinctUsers.add(user);
                            }}
//              获取uv
                        long uv = distinctUsers.size();
//                日期_搜索词,用户个数
                        return new Tuple2<String, Long>(dateKeyword, uv);
                    }});
        List<Tuple2<String, Long>> collect2 = dateKeywordUvRDD.collect();
        for (Tuple2<String, Long> stringLongTuple2 : collect2) {System.out.println("对每天每个搜索词的搜索用户  去重操作  获得前uv");
            System.out.println(stringLongTuple2);
        }//        将每天每个搜索词的uv数据,转换成DataFrame
        JavaRDD<Row> dateKeywordUvRowRDD = dateKeywordUvRDD.map(new Function<Tuple2<String, Long>, Row>() {@Override
            public Row call(Tuple2<String, Long> dateKeywordUv) throws Exception {String date = dateKeywordUv._1.split("_")[0];
                String keyword = dateKeywordUv._1.split("_")[1];
                long uv = dateKeywordUv._2;
                return RowFactory.create(date, keyword, uv);
            }});
        ArrayList<StructField> fields = new ArrayList<StructField>();
        fields.add(DataTypes.createStructField("date", DataTypes.StringType, true));
        fields.add(DataTypes.createStructField("keyword", DataTypes.StringType, true));
        fields.add(DataTypes.createStructField("uv", DataTypes.LongType, true));
        StructType structType = DataTypes.createStructType(fields);
        DataFrame dateKeywordUvDF = sqlContext.createDataFrame(dateKeywordUvRowRDD, structType);
        dateKeywordUvDF.registerTempTable("sales");
//        使用开窗函数,统计每天搜索uv排名前三的热点搜索词
//        日期  搜索词   人数个数  前三名
        final DataFrame dailyTop3KeyWordDF = sqlContext.sql("select date,keyword,uv from (select date, keyword, uv, row_number() over (partition by date order by uv DESC ) rank from sales ) tmp_sales where rank <=3");
//        将DataFrame转换为RDD, 映射,
        JavaRDD<Row> dailyTop3KeyWordRDD = dailyTop3KeyWordDF.javaRDD();

        JavaPairRDD<String, String> dailyTop3KeywordRDD = dailyTop3KeyWordRDD.mapToPair(new PairFunction<Row, String, String>() {@Override
            public Tuple2<String, String> call(Row row) throws Exception {String date = String.valueOf(row.get(0));
                String keyword = String.valueOf(row.get(1));
                String uv = String.valueOf(row.get(2));
//                映射为  日期  搜索词_总个数
                return new Tuple2<String, String>(date, keyword + "_" + uv);
            }});

        List<Tuple2<String, String>> collect = dailyTop3KeywordRDD.collect();
        for (Tuple2<String, String> stringStringTuple2 : collect) {System.out.println("开窗函数操作");
            System.out.println(stringStringTuple2);
        }//          根据 日期分组
        JavaPairRDD<String, Iterable<String>> top3DateKeywordsRDD = dailyTop3KeywordRDD.groupByKey();
//        进行映射
        JavaPairRDD<Long, String> uvDateKeywordsRDD = top3DateKeywordsRDD.mapToPair(new PairFunction<Tuple2<String, Iterable<String>>, Long, String>() {@Override
            public Tuple2<Long, String> call(Tuple2<String, Iterable<String>> tuple) throws Exception {String date = tuple._1;
//                搜索词_总个数  集合
                Iterator<String> KeyWordUviterator = tuple._2.iterator();
                long totalUv = 0L;
                String dateKeyword = date;
                while (KeyWordUviterator.hasNext()) {
//                    搜索词_个数
                    String keywoarUv = KeyWordUviterator.next();
                    Long uv = Long.valueOf(keywoarUv.split("_")[1]);
                    totalUv += uv;
                    dateKeyword = dateKeyword + "," + keywoarUv;
                }return new Tuple2<Long, String>(totalUv, dateKeyword);
            }});
        JavaPairRDD<Long, String> sortedUvDateKeywordsRDD = uvDateKeywordsRDD.sortByKey(false);
        List<Tuple2<Long, String>> rows = sortedUvDateKeywordsRDD.collect();
        for (Tuple2<Long, String> row : rows) {System.out.println(row._2 + "    " + row._1);
        }//        映射
        JavaRDD<Row> resultRDD = sortedUvDateKeywordsRDD.flatMap(new FlatMapFunction<Tuple2<Long, String>, Row>() {@Override
            public Iterable<Row> call(Tuple2<Long, String> tuple) throws Exception {String dateKeywords = tuple._2;
                String[] dateKeywordsSplit = dateKeywords.split(",");
                String date = dateKeywordsSplit[0];
                ArrayList<Row> rows = new ArrayList<Row>();
                rows.add(RowFactory.create(date, dateKeywordsSplit[1].split("_")[0],
                        Long.valueOf(dateKeywordsSplit[1].split("_")[1])));

                rows.add(RowFactory.create(date, dateKeywordsSplit[2].split("_")[0],
                        Long.valueOf(dateKeywordsSplit[2].split("_")[1])));

                rows.add(RowFactory.create(date, dateKeywordsSplit[3].split("_")[0],
                        Long.valueOf(dateKeywordsSplit[3].split("_")[1])));

                return rows;
            }});
        DataFrame finalDF = sqlContext.createDataFrame(resultRDD, structType);
        List<Row> rows1 = finalDF.javaRDD().collect();
        for (Row row : rows1) {System.out.println(row);
        }jsc.stop();

    }
}

注意点:

1、如果文本案例使用的是txt编辑,将文本保存ANSI格式,否则在groupByKey的时候,第一行默认会出现一个空格,分组失败。最开始使用的是UTF-8格式

2、文本的最后禁止出现空行,否则在split的时候会报错,出现数组越界的错误。

3、使用到窗口函数的时候,必须使用到HiveContext方法,HiveContext使用到的是SparkContext,使用使用jsc.sc()

运行结果:

进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)[leo]
(2018-10-2_meat,[leo])
进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)[leo, leo1, leo2, jack]
(2018-10-2_water,[leo, leo1, leo2, jack])
进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)[leo, leo1]
(2018-10-1_food,[leo, leo1])
进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)[leo1, leo2, leo3]
(2018-10-2_seafood,[leo1, leo2, leo3])
进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)[leo, leo1, leo2, jack, jack1]
(2018-10-1_water,[leo, leo1, leo2, jack, jack1])
进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)[leo2]
(2018-10-1_meat,[leo2])
进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)[leo, leo1, leo2]
(2018-10-1_seafood,[leo, leo1, leo2])
进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)[leo1, leo2]

(2018-10-2_food,[leo1, leo2])

对每天每个搜索词的搜索用户  去重操作  获得前uv
(2018-10-2_meat,1)
对每天每个搜索词的搜索用户  去重操作  获得前uv
(2018-10-2_water,4)
对每天每个搜索词的搜索用户  去重操作  获得前uv
(2018-10-1_food,2)
对每天每个搜索词的搜索用户  去重操作  获得前uv
(2018-10-2_seafood,3)
对每天每个搜索词的搜索用户  去重操作  获得前uv
(2018-10-1_water,5)
对每天每个搜索词的搜索用户  去重操作  获得前uv
(2018-10-1_meat,1)
对每天每个搜索词的搜索用户  去重操作  获得前uv
(2018-10-1_seafood,3)
对每天每个搜索词的搜索用户  去重操作  获得前uv

(2018-10-2_food,2)

窗函数操作
(2018-10-1,water_5)
开窗函数操作
(2018-10-1,seafood_3)
开窗函数操作
(2018-10-1,food_2)
开窗函数操作
(2018-10-2,water_4)
开窗函数操作
(2018-10-2,seafood_3)
开窗函数操作

(2018-10-2,food_2)

最终结果

[2018-10-1,water,5]
[2018-10-1,seafood,3]

[2018-10-1,food,2]

[2018-10-2,water,4]
[2018-10-2,seafood,3]
[2018-10-2,food,2]

每日top3热点搜索词统计案例相关推荐

  1. 通过Spark Streaming的window操作实战模拟热点搜索词案例实战

    本博文主要内容包括: 1.在线热点搜索词实现解析 2.SparkStreaming 利用reduceByKeyAndWindow实现在线热点搜索词实战 一:在线热点搜索词实现解析 背景描述:在社交网络 ...

  2. reduceByKeyAndWindow基于滑动窗口的热点搜索词实时统计(Scala版本)

    package SparkStreaming import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, ...

  3. 拼多多搜索词统计 API接口操作展示说明

    通过次接口 item_search_data可以获取搜索词统计,同时了解到该店铺的(num_iid商品ID.title商品标题.price价格.orginal_price原价.nick卖家昵称.num ...

  4. 拼多多API—获取商品详情、关键词获取取商品、获得搜索词统计获得搜索词推荐

    1.item_get - 根据ID取商品详情: 返回数据部分截图: 点击免费测试 2.item_search - 根据关键词取商品列表: 返回数据部分截图: 3.item_search_data - ...

  5. spark 项目应用-topn搜索热词统计

    本文章通过spark读取hive数据,分析top热点搜索词 import java.util.ArrayList; import java.util.Arrays; import java.util. ...

  6. 《Spark SQL大数据实例开发》9.2 综合案例实战——电商网站搜索排名统计

    <Spark SQL大数据实例开发>9.2 综合案例实战--电商网站搜索排名统计 9.2.1 案例概述     本节演示一个网站搜索综合案例:以京东为例,用户登录京东网站,在搜索栏中输入搜 ...

  7. 73、Spark SQL之开窗函数以及top3销售额统计案例实战

    开窗函数以及top3销售额统计案例实战 Spark 1.4.x版本以后,为Spark SQL和DataFrame引入了开窗函数,比如最经典,最常用的,row_number(),可以让我们实现分组取to ...

  8. 百度统计出现不属于自己网站的域名搜索词及数据怎么处理

    百度统计出现不属于自己网站的域名搜索词及数据怎么处理? 这种情况多半是你网站的百度统计代码别别人利用了,建议你找程序将百度统计代码加密传输这是一种解决方式 另一种设置网站黑白名单,进入百度统计管理界面 ...

  9. 百度统计搜索词是广告怎么办?建议设置过滤规则排除

    最近经常在百度统计中看到有很多垃圾广告的搜索词,甚至连 Top 搜索词.Top 来源网站.Top 入口页面中都出现很多垃圾广告.比如老古的一个测试网站就出现这种情况,具体如下图所示: 据了解,网络上有 ...

最新文章

  1. 牛客网_剑指Offer_Python实现_更新中
  2. Matlab创建特殊数组学习
  3. python opencv 等比例调整(缩放)图片分辨率大小代码 cv2.resize()
  4. python 为什么要用astype()函数对numpy数据类型进行转换,而不直接指定其dtype?float(64) float(32) int(64) int(32)(转换为整型int)
  5. 广搜--(搜索的第一道题)图像有用区域
  6. rsync推拉模型及结合inotify实现推模型自动同步
  7. 普里姆从不同顶点出发_来自三个不同聚类分析的三个不同教训数据科学的顶点...
  8. leetcode49. 字母异位词分组
  9. 【kafka】kafka Producer Metadata概述及源码分析
  10. Alibaba Druid 源码阅读(一) 数据库连接池初步
  11. 关于RestTemplate的小笔记
  12. 可行性分析报告 模板
  13. 还在用 ZXing ? 试试华为统一扫码服务吧!
  14. 将Excel数据导入到MySQL数据库
  15. python导入包总是失败
  16. android9.0自动使用24小时制
  17. 安卓手机拨号键盘隐藏工程代码大全
  18. 在ArcGIS中快速搭建三维场景教程(从数据获取到软件制作)
  19. python三维图能画地图_使用Python绘制地图的三大秘密武器
  20. 计算机组成原理 CPU 结构和功能

热门文章

  1. vscode python 自动补全_利用CodeBERT,这个VS Code扩展可以自动生成Python文档字符串...
  2. Eclipse下Tomcat服务器配置和使用
  3. P4721 【模板】分治 FFT
  4. 【雅礼集训2017】字符串【后缀自动机】【数据分治】
  5. 【CF1189D】Add on a Tree【结论】【构造】
  6. 牛客题霸 [滑动窗口的最大值] C++题解/答案
  7. CF1472(div3):总结
  8. P5371-[SNOI2019]纸牌【矩阵乘法】
  9. P4100-[HEOI2013]钙铁锌硒维生素【矩阵求逆,最大匹配】
  10. nssl1249-C【数论】