场景描述:这是一个Spark的实战题目,也是在面试中经常出现的一类题目。

问题描述

对于一个大型网站,用户访问量尝尝高达数十亿。对于数十亿是一个什么样的概念,我们这里可以简单的计算一下。对于一个用户,单次访问,我们通常会记录下哪些数据呢?

1、用户的id
2、用户访问的时间
3、用户逗留的时间
4、用户执行的操作
5、用户的其余数据(比如IP等等)

我们单单从用户id来说,比如10011802330414,这个ID,那么我们一个id差不多就是一个long类型,因为在大量数据存储的时候,我们都是采用文本存储。因此对于5亿个用户ID,完全存储在磁盘当中,大概是5G的大小,对于这个大小,并不能算是大数据。但是对于一个案例来说,已经非常足够了。

我们会产生一个5亿条ID的数据集,我们上面说到,这个数据集大小为5G(不压缩的情况下),因此我不会在GitHub上上传这样一个数据集,但是我们提供一个方法,来生成一个5亿条数据。

当然要解决这个问题,你可以依然在local模式下运行项目,但是你得有足够的磁盘空间和内存空间,大概8G磁盘空间(因为除了数据本身,spark运行过程还要产生一些临时数据),5G内存(要进行reduceByKey)。为了真正展示spark的特性,我们这个案例,将会运行在spark集群上。

关于如何搭建集群,我准备在后续的章节补上。但是在网上有大量的集群搭建教程,其中不乏一些详细优秀的教程。当然,这节我们不讲如何搭建集群,但是我们仍然可以开始我们的案例。

问题分析
那么现在我们拥有了一个5亿条数据(实际上这个数据并不以文本存储,而是在运行的时候生成),从五亿条数据中,找出访问次数最多的人,这看起来并不难。但实际上我们想要通过这个案例了解spark的真正优势。

5亿条ID数据,首先可以用map将其缓存到RDD中,然后对RDD进行reduceByKey,最后找出出现最多的ID。思路很简单,因此代码量也不会很多。

实现

scala实现
首先是ID生成方法:
RandomId.class

import org.apache.spark.{SparkConf, SparkContext}object ActiveVisitor {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("spark://master:7077").setAppName("ActiveVisitor")val sc = new SparkContext(conf)//生成一个0-9999的列表val list = 1 until 10000val id =new RandomId()//这里记录最大的次数var max = 0//这里记录最大次数的IDvar maxId = 0Lval lastNum = sc.parallelize(list)//第一步生成5亿条数据.flatMap(num => {//遍历list列表//总共遍历1万次每次生成5万个IDvar list2 = List(id.next())for (i <- 1 to 50000){list2 = id.next() :: list2}//这里记录当前生成ID的百分比println(num/1000.0 +"%")//返回生成完成后的list//每次循环里面都包含5万个IDlist2})//遍历5亿条数据//为每条数据出现标记1.map((_,1))//对标记后的数据进行处理//得到每个ID出现的次数,即(ID,Count).reduceByKey(_+_)//遍历处理后的数据.foreach(x => {//将最大值存储在max中if (x._2 > max){max = x._2maxId = x._1//若X比之前记录的值大,则输出该id和次数//最后一次输出结果,则是出现次数最多的的ID和以及其出现的次数//当然出现次数最多的可能有多个ID//这里只输出一个println(x)}})}}

然后是用它生成5亿条数据

import org.apache.spark.{SparkConf, SparkContext}object ActiveVisitor {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("spark://master:7077").setAppName("ActiveVisitor")val sc = new SparkContext(conf)val list = 1 until 100000val id =new RandomId()var max = 0var maxId = 0Lval lastNum = sc.parallelize(list).flatMap(num => {var list2 = List(id.next())for (i <- 1 to 50000){list2 = id.next() :: list2}println(num +"%")list2}).map((_,1)).reduceByKey(_+_).foreach(x => {if (x._2 > max){max = x._2maxId = x._1println(x)}})}
}

处理5亿条数据

import org.apache.spark.{SparkConf, SparkContext}object ActiveVisitor {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("spark://master:7077").setAppName("ActiveVisitor")val sc = new SparkContext(conf)//生成一个0-9999的列表val list = 1 until 10000val id =new RandomId()//这里记录最大的次数var max = 0//这里记录最大次数的IDvar maxId = 0Lval lastNum = sc.parallelize(list)//第一步生成5亿条数据.flatMap(num => {//遍历list列表//总共遍历1万次每次生成5万个IDvar list2 = List(id.next())for (i <- 1 to 50000){list2 = id.next() :: list2}//这里记录当前生成ID的百分比println(num/1000.0 +"%")//返回生成完成后的list//每次循环里面都包含5万个IDlist2})//遍历5亿条数据//为每条数据出现标记1.map((_,1))//对标记后的数据进行处理//得到每个ID出现的次数,即(ID,Count).reduceByKey(_+_)//遍历处理后的数据.foreach(x => {//将最大值存储在max中if (x._2 > max){max = x._2maxId = x._1//若X比之前记录的值大,则输出该id和次数//最后一次输出结果,则是出现次数最多的的ID和以及其出现的次数//当然出现次数最多的可能有多个ID//这里只输出一个println(x)}})}
}

运行得到结果
将其提交到spark上运行,观察日志

1%
5000%
2%
5001%
3%
5002%
4%
5003%
5%
5004%
6%
5005%
7%
5006%
8%
5007%
9%
5008%
10%
5009%
11%
5010%
12%
5011%
5012%
13%
5013%
14%
15%
5014%...
...
...

这里是输出的部分日志,从日志中,我们显然发现,程序是并行的。我采用的集群由四个节点组成,每个节点提供5G的内存空间,集群在不同节点中运行,有节点分配到的分区是从1开始,而有节点则是从5000开始,因此程序并没有按照我们所想的从1%-9999%。好在未按照顺序执行,也并不影响最终结果,毕竟最终要进行一个reduceByKey,才是我们真正需要得到结果的地方。
再看日志另一部分:

5634%
5635%
5636%
5637%
5638%
5639%
5640%
5641%
5642%
5643%
5644%
5645%
2019-03-05 11:52:14 INFO  ExternalSorter:54 - Thread 63 spilling in-memory map of 1007.3 MB to disk (2 times so far)
647%
648%
649%
650%
651%
652%
653%
654%
655%
656%

注意到这里,spilling in-memory map of 1007.3 MB to disk,spilling操作将map中的 1007.3 MB的数据溢写到磁盘中。这是由于spark在处理的过程中,由于数据量过于庞大,因此将多的数据溢写到磁盘,当再次用到时,会从磁盘读取。对于实时性操作的程序来说,多次、大量读写磁盘是绝对不被允许的。但是在处理大数据中,溢写到磁盘是非常常见的操作。

事实上,在完整的日志中,我们可以看到有相当一部分日志是在溢写磁盘的时候生成的,大概49次(这是我操作过程中的总数)
如图:

总共出现49条溢写操作的日志,每次大概是1G,这也印证了我们5亿条数据,占据空间5G的一个说法。事实上,我曾将这5亿条数据存储在磁盘中,的确其占据的空间是5G左右。

结果

最终,我们可以在日志中看到结果。

整个过程持续了将近47min,当然在庞大的集群中,时间能够大大缩短,要知道,我们现在只采用了4个节点。

我们看到了次数2、4、6、8居然分别出现了两次,这并不奇怪,因为集群并行运行,异步操作,出现重复结果十分正常,当然我们也可以用并发机制,去处理这个现象。这个在后续的案例中,我们会继续优化结果。

从结果上看,我们发现5亿条数据中,出现最多的ID也仅仅出现了8次,这说明了在大量数据中,很多ID可能只出现了1次、2次。这也就是为什么最后我采用的是foreach方法去寻找最大值,而不采用如下的方法

import org.apache.spark.{SparkConf, SparkContext}object ActiveVisitor {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("spark://master:7077").setAppName("ActiveVisitor")val sc = new SparkContext(conf)//生成一个0-9999的列表val list = 1 until 10000val id =new RandomId()//这里记录最大的次数var max = 0//这里记录最大次数的IDvar maxId = 0Lval lastNum = sc.parallelize(list)//第一步生成5亿条数据.flatMap(num => {//遍历list列表//总共遍历1万次每次生成5万个IDvar list2 = List(id.next())for (i <- 1 to 50000){list2 = id.next() :: list2}//这里记录当前生成ID的百分比println(num/1000.0 +"%")//返回生成完成后的list//每次循环里面都包含5万个IDlist2})//遍历5亿条数据//为每条数据出现标记1.map((_,1))//对标记后的数据进行处理//得到每个ID出现的次数,即(ID,Count).reduceByKey(_+_)//为数据进行排序//倒序.sortByKey(false)//次数最多的,在第一个,将其输出println(lastNum.first())}
}

这个方法中,我们对reduceByKey结果进行排序,输出排序结果的第一个,即次数最大的ID。这样做似乎更符合我们的要求。但是实际上,为了得到同样的结果,这样做,会消耗更多的资源。如我们所说,很多ID启其实只出现了一次,两次,排序的过程中,仍然要对其进行排序。要知道,由于很多ID只出现一次,排序的数据集大小很有可能是数亿的条目。

根据我们对排序算法的了解,这样一个庞大数据集进行排序,势必要耗费大量资源。因此,我们能够容忍输出一些冗余信息,但不影响我们的得到正确结果。

至此,我们完成了5亿数据中,找出最多出现次数的数据。如果感兴趣,可以尝试用这个方法解决50亿条数据,出现最多的数据条目。但是这样做的话,你得准备好50G的空间。尽管用上述的程序,属于阅后即焚,但是50亿数据仍然会耗费大量的时间。

Spark的实战题目——寻找5亿次访问中,访问次数最多的人相关推荐

  1. 面试题:寻找一个字符串中出现次数最多的字符以及出现的次数

    要求编写代码实现:寻找一个字符串中出现次数最多的字符以及出现的次数. 解法一:用删除法实现 (挺巧妙的一种) public class FindTheMostAppearChar {public st ...

  2. lista=['a','abc','d','abc','fgi','abf'],寻找列表中出现次数最多的第一个 字母,出现了几次

    思路: 1.建一个新的字符串lista_s,用来存放列表lista中每个元素的第一个字母 2.建立一个字典lista_dict,key是字符串lista_s的元素,value是字符串lista_s对应 ...

  3. 从10亿个数字中找出最大的前100个数

    先拿10000个数建堆,然后一次添加剩余元素,如果大于堆顶的数(10000中最小的),将这个数替换堆顶,并调整结构使之仍然是一个最小堆,这样,遍历完后,堆中的10000个数就是所需的最大的10000个 ...

  4. [笔试题目] 简单总结笔试和面试中的海量数据问题

    最近在笔试和面试中遇到了很多关于海量数据的问题,在此进行简单的记录,写一篇方便自己下次学习的处理海量数据的文章及在线笔记,同时也希望对你有所帮助.当然,海量数据最出名的还是七月July,但这里我是想直 ...

  5. 【面试被虐】如何只用2GB内存从20亿,40亿,80亿个整数中找到出现次数最多的数?...

    这几天小秋去面试了,不过最近小秋学习了不少和位算法相关文章,例如 [面试现场]如何判断一个数是否在40亿个整数中? [算法技巧]位运算装逼指南 对于算法题还是有点信心的,,,,于是,发现了如下对话. ...

  6. 如何只用2GB内存从20/40/80亿个整数中找到出现次数最多的数

    来源:公众号[苦逼的码农] 这几天小秋去面试了,不过最近小秋学习了不少和位算法相关文章,例如: [算法技巧]位运算装逼指南 对于算法题还是有点信心的,,,,于是,发现了如下对话. 20亿级别 面试官: ...

  7. 从1亿个ip中找出访问次数最多的IP

    看了教你如何迅速秒杀掉:99%的海量数据处理面试题一文,的确是挺有收获的,特别是对这种海量数据的处理,的确是有了一个挺清晰的思路,特别感谢原文博主July. 处理海量数据问题存在的原因就在于1)数据量 ...

  8. 【面试被虐】如何只用2GB内存从20亿,40亿,80亿个整数中找到出现次数最多的数?

    这几天小秋去面试了,不过最近小秋学习了不少和位算法相关文章,例如 [面试现场]如何判断一个数是否在40亿个整数中? [算法技巧]位运算装逼指南 对于算法题还是有点信心的,,,,于是,发现了如下对话. ...

  9. 只用2GB的内存找出20亿个整数中找到出现次数最多的数

    要求有一个包含20亿个32位整数的文件,从中找到出现次数最多的数. 首先先分析一下,32位int类型的数占4B,20亿个4B 约为 8GB,只用2GB肯定不够.所以我们肯定需要将这20亿个数哈希到不同 ...

最新文章

  1. [转]c++之菱形继承
  2. リアルタイム3Dニャンニャン 汉化补丁
  3. Programming Computer Vision with Python (学习笔记七)
  4. 实现图片验证码,其实就是简单的验证码实现,记录一下
  5. 比特币base58源码解析_中本聪源码早期版本流出:区块链原名时间链,比特币内置虚拟扑克游戏...
  6. linux c 指针数组定义数组长度,C/C++指针数组和 迪士尼源码搭建下载 数组指针...
  7. python用装饰器实现缓存函数执行结果
  8. 使用Anaconda3安装tensorflow,opencv,使其可以在spyder中运行
  9. 20211130:力扣第267周周赛(下)
  10. java安全入门篇之接口验签(原创)
  11. android中工厂模式应用,抽象工厂模式在android中使用
  12. eclipse svn Subversive
  13. 汽车之家推荐系统排序算法迭代之路
  14. Word中封面怎么设置不显示页码?
  15. 分享5款堪称神器的免费软件,建议先收藏再下载
  16. 农场工具程序设计(三)
  17. pyqt5+pyinstaller图标ico制作说明
  18. 计算机启动时蓝屏后自动重起,电脑蓝屏_处理电脑开机蓝屏自动重启
  19. B站弹幕姬,弹幕礼物感谢,关注感谢,自动回复,房管工具,房管助手,基于java
  20. 如何制作Gif动态图

热门文章

  1. Silverlight中枚举并加载客户端程序集
  2. CodeForces - 1000C Covered Points Count(差分+思维)
  3. POJ - 2175 Evacuation Plan(最小费用最大流+消圈定理)
  4. MMDetection-简介
  5. Cloud Programming Simplified: A Berkerley View on Serverless Computing笔记
  6. HDU4416(后缀自动机)
  7. POJ3695(矩形切割中等题)
  8. 【网络编程】之四、socket网络编程例解
  9. 浅谈游戏自动寻路A*算法
  10. 战斗民族开源神器。ClickHouse为什么能够征服各个大厂?