2021年大数据Spark(二十一):Spark Core案例-SogouQ日志分析
目录
案例-SogouQ日志分析
业务需求
准备工作
HanLP 中文分词
样例类 SogouRecord
业务实现
搜索关键词统计
用户搜索点击统计
搜索时间段统计
完整代码
案例-SogouQ日志分析
使用搜狗实验室提供【用户查询日志(SogouQ)】数据,使用Spark框架,将数据封装到RDD中进行业务数据处理分析。数据网址:http://www.sogou.com/labs/resource/q.php
1)、数据介绍:搜索引擎查询日志库设计为包括约1个月(2008年6月)Sogou搜索引擎部分网页查询需求及用户点击情况的网页查询日志数据集合。
2)、数据格式
访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL
用户ID是根据用户使用浏览器访问搜索引擎时的Cookie信息自动赋值,即同一次使用浏览器输入的不同查询对应同一个用户ID
3)、数据下载:分为三个数据集,大小不一样
迷你版(样例数据, 376KB):http://download.labs.sogou.com/dl/sogoulabdown/SogouQ/SogouQ.mini.zip
精简版(1天数据,63MB):http://download.labs.sogou.com/dl/sogoulabdown/SogouQ/SogouQ.reduced.zip
完整版(1.9GB):http://www.sogou.com/labs/resource/ftp.php?dir=/Data/SogouQ/SogouQ.zip
业务需求
针对SougoQ用户查询日志数据中不同字段,不同业务进行统计分析:
使用SparkContext读取日志数据,封装到RDD数据集中,调用Transformation函数和Action函数处理分析,灵活掌握Scala语言编程。
准备工作
在编程实现业务功能之前,首先考虑如何对【查询词】进行中文分词及将日志数据解析封装。
HanLP 中文分词
使用比较流行好用中文分词:HanLP,面向生产环境的自然语言处理工具包,HanLP 是由一系列模型与算法组成的 Java 工具包,目标是普及自然语言处理在生产环境中的应用。
官方网站:http://www.hanlp.com/,添加Maven依赖
<dependency><groupId>com.hankcs</groupId><artifactId>hanlp</artifactId><version>portable-1.7.7</version></dependency>
演示范例:HanLP 入门案例,基本使用
package cn.itcast.coreimport java.utilimport com.hankcs.hanlp.HanLP
import com.hankcs.hanlp.seg.common.Term
import com.hankcs.hanlp.tokenizer.StandardTokenizerimport scala.collection.JavaConverters._/*** HanLP 入门案例,基本使用*/
object HanLPTest {def main(args: Array[String]): Unit = {// 入门Demoval terms: util.List[Term] = HanLP.segment("杰克奥特曼全集视频")println(terms)println(terms.asScala.map(_.word.trim))// 标准分词val terms1: util.List[Term] = StandardTokenizer.segment("放假++端午++重阳")println(terms1)println(terms1.asScala.map(_.word.replaceAll("\\s+", "")))val words: Array[String] ="""00:00:00 2982199073774412 [360安全卫士] 8 3 download.it.com.cn/softweb/software/firewall/antivirus/20067/17938.html""".split("\\s+")println(words(2).replaceAll("\\[|\\]", ""))//将"["和"]"替换为空""}}
样例类 SogouRecord
将每行日志数据封装到CaseClass样例类SogouRecord中,方便后续处理:
/*** 用户搜索点击网页记录Record* @param queryTime 访问时间,格式为:HH:mm:ss* @param userId 用户ID* @param queryWords 查询词* @param resultRank 该URL在返回结果中的排名* @param clickRank 用户点击的顺序号* @param clickUrl 用户点击的URL*/
case class SogouRecord(queryTime: String, userId: String, queryWords: String, resultRank: Int, clickRank: Int, clickUrl: String )
业务实现
先读取数据,封装到SougoRecord类中,再按照业务处理数据。
最后也可以将分析的结果存储到MySQL表中。
读取数据
构建SparkContext实例对象,读取本次SogouQ.sample数据,封装到SougoRecord中 。
object SogouQueryAnalysis {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// TODO: 1. 本地读取SogouQ用户查询日志数据val rawLogsRDD: RDD[String] = sc.textFile("data/input/SogouQ.sample")//println(s"Count = ${rawLogsRDD.count()}")// TODO: 2. 解析数据,封装到CaseClass样例类中val recordsRDD: RDD[SogouRecord] = rawLogsRDD// 过滤不合法数据,如null,分割后长度不等于6.filter(log => log != null && log.trim.split("\\s+").length == 6)// 对每个分区中数据进行解析,封装到SogouRecord.mapPartitions(iter => {iter.map(log => {val arr: Array[String] = log.trim.split("\\s+")SogouRecord(arr(0),arr(1),arr(2).replaceAll("\\[|\\]", ""),arr(3).toInt,arr(4).toInt,arr(5))})})println(s"Count = ${recordsRDD.count()},\nFirst = ${recordsRDD.first()}")// 数据使用多次,进行缓存操作,使用count触发recordsRDD.persist(StorageLevel.MEMORY_AND_DISK).count()
搜索关键词统计
获取用户【查询词】,使用HanLP进行分词,按照单词分组聚合统计出现次数,类似WordCount程序,具体代码如下:
// =================== 3.1 搜索关键词统计 ===================
// a. 获取搜索词,进行中文分词
val wordsRDD: RDD[String] = recordsRDD.mapPartitions(iter => {iter.flatMap(record => {// 使用HanLP中文分词库进行分词val terms: util.List[Term] = HanLP.segment(record.queryWords.trim)// 将Java中集合对转换为Scala中集合对象import scala.collection.JavaConverters._terms.asScala.map(_.word)})
})
println(s"Count = ${wordsRDD.count()}, Example = ${wordsRDD.take(5).mkString(",")}")// b. 统计搜索词出现次数,获取次数最多Top10
val top10SearchWords: Array[(Int, String)] = wordsRDD.map((_, 1)) // 每个单词出现一次.reduceByKey(_ + _) // 分组统计次数.map(_.swap).sortByKey(ascending = false) // 词频降序排序.take(10) // 获取前10个搜索词
top10SearchWords.foreach(println)
运行结果如下:
用户搜索点击统计
统计出每个用户每个搜索词点击网页的次数,可以作为搜索引擎搜索效果评价指标。先按照用户ID分组,再按照【查询词】分组,最后统计次数,求取最大次数、最小次数及平均次数。
// =================== 3.2 用户搜索点击次数统计 ===================
/*每个用户在搜索引擎输入关键词以后,统计点击网页数目,反应搜索引擎准确度先按照用户ID分组,再按照搜索词分组,统计出每个用户每个搜索词点击网页个数*/
val clickCountRDD: RDD[((String, String), Int)] = recordsRDD.map(record => {// 获取用户ID和搜索词val key = (record.userId, record.queryWords)(key, 1)})// 按照用户ID和搜索词组合的Key分组聚合.reduceByKey(_ + _)clickCountRDD.sortBy(_._2, ascending = false).take(10).foreach(println)println(s"Max Click Count = ${clickCountRDD.map(_._2).max()}")
println(s"Min Click Count = ${clickCountRDD.map(_._2).min()}")
println(s"Avg Click Count = ${clickCountRDD.map(_._2).mean()}")
程序运行结果如下:
搜索时间段统计
按照【访问时间】字段获取【小时:分钟】,分组统计各个小时段用户查询搜索的数量,进一步观察用户喜欢在哪些时间段上网,使用搜狗引擎搜索,代码如下:
// =================== 3.3 搜索时间段统计 ===================
/*从搜索时间字段获取小时,统计个小时搜索次数*/
val hourSearchRDD: RDD[(String, Int)] = recordsRDD// 提取小时和分钟.map(record => {// 03:12:50record.queryTime.substring(0, 5)})// 分组聚合.map((_, 1)) // 每个单词出现一次.reduceByKey(_ + _) // 分组统计次数.sortBy(_._2, ascending = false)
hourSearchRDD.foreach(println)
程序运行结果如下:
完整代码
业务实现完整代码SogouQueryAnalysis如下所示:
package cn.itcast.coreimport java.utilimport com.hankcs.hanlp.HanLP
import com.hankcs.hanlp.seg.common.Term
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}/*** 用户查询日志(SogouQ)分析,数据来源Sogou搜索引擎部分网页查询需求及用户点击情况的网页查询日志数据集合。* 1. 搜索关键词统计,使用HanLP中文分词* 2. 用户搜索次数统计* 3. 搜索时间段统计* 数据格式:* 访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL* 其中,用户ID是根据用户使用浏览器访问搜索引擎时的Cookie信息自动赋值,即同一次使用浏览器输入的不同查询对应同一个用户ID*/
object SogouQueryAnalysis {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// TODO: 1. 本地读取SogouQ用户查询日志数据val rawLogsRDD: RDD[String] = sc.textFile("data/input/SogouQ.sample")//val rawLogsRDD: RDD[String] = sc.textFile("D:/data/sogou/SogouQ.reduced")//println(s"Count = ${rawLogsRDD.count()}")// TODO: 2. 解析数据,封装到CaseClass样例类中val recordsRDD: RDD[SogouRecord] = rawLogsRDD// 过滤不合法数据,如null,分割后长度不等于6.filter(log => log != null && log.trim.split("\\s+").length == 6)// 对每个分区中数据进行解析,封装到SogouRecord.mapPartitions(iter => {iter.map(log => {val arr: Array[String] = log.trim.split("\\s+")SogouRecord(arr(0),arr(1),arr(2).replaceAll("\\[|\\]", ""),arr(3).toInt,arr(4).toInt,arr(5))})})println("====解析数据===")println(s"Count = ${recordsRDD.count()},\nFirst = ${recordsRDD.first()}")// 数据使用多次,进行缓存操作,使用count触发recordsRDD.persist(StorageLevel.MEMORY_AND_DISK).count()// TODO: 3. 依据需求统计分析/*1. 搜索关键词统计,使用HanLP中文分词2. 用户搜索次数统计3. 搜索时间段统计*/println("====3.1 搜索关键词统计===")// =================== 3.1 搜索关键词统计 ===================// a. 获取搜索词,进行中文分词val wordsRDD: RDD[String] = recordsRDD.mapPartitions(iter => {iter.flatMap(record => {// 使用HanLP中文分词库进行分词val terms: util.List[Term] = HanLP.segment(record.queryWords.trim)// 将Java中集合对转换为Scala中集合对象import scala.collection.JavaConverters._terms.asScala.map(_.word)})})//println(s"Count = ${wordsRDD.count()}, Example = ${wordsRDD.take(5).mkString(",")}")// b. 统计搜索词出现次数,获取次数最多Top10val top10SearchWords: Array[(Int, String)] = wordsRDD.map((_, 1)) // 每个单词出现一次.reduceByKey(_ + _) // 分组统计次数.map(_.swap).sortByKey(ascending = false) // 词频降序排序.take(10) // 获取前10个搜索词top10SearchWords.foreach(println)println("====3.2 用户搜索点击次数统计===")// =================== 3.2 用户搜索点击次数统计 ===================/*每个用户在搜索引擎输入关键词以后,统计点击网页数目,反应搜索引擎准确度先按照用户ID分组,再按照搜索词分组,统计出每个用户每个搜索词点击网页个数*/val clickCountRDD: RDD[((String, String), Int)] = recordsRDD.map(record => {// 获取用户ID和搜索词val key = (record.userId, record.queryWords)(key, 1)})// 按照用户ID和搜索词组合的Key分组聚合.reduceByKey(_ + _)clickCountRDD.sortBy(_._2, ascending = false).take(10).foreach(println)println(s"Max Click Count = ${clickCountRDD.map(_._2).max()}")println(s"Min Click Count = ${clickCountRDD.map(_._2).min()}")println(s"Avg Click Count = ${clickCountRDD.map(_._2).mean()}")println("====3.3 搜索时间段统计===")// =================== 3.3 搜索时间段统计 ===================/*从搜索时间字段获取小时,统计个小时搜索次数*/val hourSearchRDD: RDD[(String, Int)] = recordsRDD// 提取小时和分钟.map(record => {// 03:12:50record.queryTime.substring(0, 5)})// 分组聚合.map((_, 1)) // 每个单词出现一次.reduceByKey(_ + _) // 分组统计次数.sortBy(_._2, ascending = false)hourSearchRDD.foreach(println)// 释放缓存数据recordsRDD.unpersist()// 应用结束,关闭资源sc.stop()}/*** 用户搜索点击网页记录Record** @param queryTime 访问时间,格式为:HH:mm:ss* @param userId 用户ID* @param queryWords 查询词* @param resultRank 该URL在返回结果中的排名* @param clickRank 用户点击的顺序号* @param clickUrl 用户点击的URL*/case class SogouRecord(queryTime: String,userId: String,queryWords: String,resultRank: Int,clickRank: Int,clickUrl: String)}
2021年大数据Spark(二十一):Spark Core案例-SogouQ日志分析相关推荐
- 大数据Spark(二十一):Spark Core案例-SogouQ日志分析
文章目录 案例-SogouQ日志分析 业务需求 准备工作 HanLP 中文分词
- 2021年大数据Kafka(十一):❤️Kafka的消费者负载均衡机制和数据积压问题❤️
全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 Kafka的消费者负载均衡机制和数据积压问题 一.kafka ...
- 2021年大数据HBase(十一):Apache Phoenix的视图操作
全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 前言 Apache Phoenix的视图操作 一.应用场景 ...
- 2021年大数据Hive(十一):Hive调优
全网最详细的大数据Hive文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 前言 Hive调优 一.本地模式 1.空key处理 二.SQL ...
- 2021年大数据ELK(一):集中式日志协议栈Elastic Stack简介
全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 一.简介 二.ELK 协议栈介绍及体系结构 三.集中式日志协议栈 ...
- 2021年大数据Hadoop(十一):HDFS的元数据辅助管理
2021大数据领域优质创作博客,带你从入门到精通,该博客每天更新,逐渐完善大数据各个知识体系的文章,帮助大家更高效学习. 有对大数据感兴趣的可以关注微信公众号:三帮大数据 目录 HDFS的元数据辅助管 ...
- 2021年大数据ELK(十一):Elasticsearch架构原理
全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 Elasticsearch架构原理 一.Elasticsearch的节点类型 ...
- 2021年大数据Flink(十一):流批一体API Source
目录 Source 预定义Source 基于集合的Source 基于文件的Source 基于Socket的Source 自定义Source 随机生成数据 MySQL Sou ...
- 2021年大数据Hadoop(二十五):YARN通俗介绍和基本架构
全网最详细的Hadoop文章系列,强烈建议收藏加关注! 后面更新文章都会列出历史文章目录,帮助大家回顾知识重点. 目录 本系列历史文章 前言 YARN通俗介绍和基本架构 Yarn通俗介绍 Yarn基本 ...
最新文章
- Java中 与,||与|的区别
- RPC框架原理及从零实现系列博客(二):11个类实现简单RPC框架
- 并发工具类:CountDownLatch、CyclicBarrier、Semaphore
- POJ - 3581 Sequence(后缀数组)
- 配置bond和vlan
- cors跨域_Spring Boot 中通过 CORS 解决跨域问题
- java路径怎么找_Java路径怎么找
- 高智商孩子14个独有的特点
- LeetCode 321. 拼接最大数(单调栈)*
- 重新认识访问者模式:从实践到本质
- centos 文件夹网络连接_CentOS的网络配置的命令详解
- Linux系统中用源代码编译安装软件和查看进程
- vs2015安装msdn_vs2015离线版msdn下载|
- Json对象和Json字符串的区别
- php laravel 中文手册,Laravel 5.6 中文离线手册文档(兼容5.5)(PDF版)
- 一线明星纷纷失业,数据告诉你今年的演员有多难
- 靶机渗透练习13-hackme1
- RHEL7-team双网卡绑定
- 南尼U盘修复——坑die专属
- Dead Connection Detection(DCD)