Spark取出(Key,Value)型数据中Value值为前n条数据
Spark取出(Key,Value)型数据中Value值为前n条数据
最近在使用Spark进行一些日志分析,需要对日志中的一些(key,value)型数据进行排序,并取出value最多的10条数据。经过查找资料,发现Spark中的top()函数可以取出排名前n的元素,以及sortBy()函数可以对(key,value)数据根据value进行排序,原以为一切都很好解决,但是实际情况并没有得到想要的结果数据,研究了部分源码,才最终达到了想要的数据,特在此备注和分享。
前期遇到的坑
刚开始,通过查找资料,知道Spark可以使用sortByKey()和sortBy() 两个函数对(key,value)型数据排序。于是,直接使用sortByKey()进行排序,排完之后才发现,排序的时候是根据Key排序,而我需要先对Key进行汇总,再根据Value进行排序。显然,sortByKey不能满足需求!
于是,开始尝试使用sortBy()函数,使用方法为 rdd.sortBy(_._2,false),即可对value进行降序排序。在测试的时候,我使用了rdd.sortBy(_._2,false).collect()进行排序和汇总,但是collect()函数会将所有的数据汇总到Driver,当数据量太大时对导致Driver中的内存不足。于是,想着只取将10条数据返回给Driver。经过查找,知道top()函数可以取出前10条数据。
top()函数中的坑及其解决方法
后来,静下心来再看了几遍,终于发现不是太对了。我的数据类型是(key,value)格式的,rdd.sortBy(_._2,false)中实现了根据value值排序的目的,但是 .top(10) 却取出了key为前10的数据。top()函数源码中,对RDD中的数据进行了reduce操作,并将结果进行排序。所以,rdd.sortBy(_._2,false).top(10) 这段代码先是对(key,value)数据根据value进行排序,而top()函数中,数据又再次对key进行了排序,导致之前根绝value排序的结果乱序了,所以最后取到的是key排在前10的数据。这就是导致问题的原因,终于被我发现了!
问题虽然被发现了,但是怎么解决呢?说实话,我对Scala也不是太了解,只能去QQ群里请教了一些大神。有一位叫做老徐的大神帮我给出了解决方法: rdd.sortBy(_._2,false).top(10)(Ordering.by(e => e._2))。再次运行,果然能得到正确结果。后来再仔细想想,觉得sortBy()函数有点多余,于是变成rdd.top(10)(Ordering.by(e => e._2))。至此,已经能对(key,value)类型的数据进行汇总,然后根据value值进行排序,最后取出value排名前10的数据了。
take()函数实现目标
在请教大神的时候,偶然接触到了take()函数,经过测试: rdd.sortBy(_._2,false).take(10) 这段代码能得到value排名前10的数据。
查看take()函数的源码,如下:
- /**
- * Take the first num elements of the RDD. It works by first scanning one partition, and use the
- * results from that partition to estimate the number of additional partitions needed to satisfy
- * the limit.
- *
- * @note this method should only be used if the resulting array is expected to be small, as
- * all the data is loaded into the driver's memory.
- *
- * @note due to complications in the internal implementation, this method will raise
- * an exception if called on an RDD of `Nothing` or `Null`.
- */
- def take(num: Int): Array[T] = withScope {
- if (num == 0) {
- new Array[T](0)
- } else {
- val buf = new ArrayBuffer[T]
- val totalParts = this.partitions.length
- var partsScanned = 0
- while (buf.size < num && partsScanned < totalParts) {
- // The number of partitions to try in this iteration. It is ok for this number to be
- // greater than totalParts because we actually cap it at totalParts in runJob.
- var numPartsToTry = 1L
- if (partsScanned > 0) {
- // If we didn't find any rows after the previous iteration, quadruple and retry.
- // Otherwise, interpolate the number of partitions we need to try, but overestimate
- // it by 50%. We also cap the estimation in the end.
- if (buf.size == 0) {
- numPartsToTry = partsScanned * 4
- } else {
- // the left side of max is >=1 whenever partsScanned >= 2
- numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1)
- numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
- }
- }
- val left = num - buf.size
- val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
- val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)
- res.foreach(buf ++= _.take(num - buf.size))
- partsScanned += p.size
- }
- buf.toArray
- }
- }
该函数的注解指出,take()函数通过扫描一个数据分区,并取出该分区中的前n个数据,避免了其它分区数据的检索。最主要的是,该函数没有对父RDD中的数据进行重新分区,所以,数据的分区和排序顺序并没有改变,因此能取出value排名前10的数据。
总结
经过上面的这些折腾,发现top()函数中所遇到的坑的实质是由于(key,value)数据在sortBy(_._2)函数中根据value进行排序的时候,会进行Shuffle操作,根据value值将原来的数据进行重新分区。而sortBy()对数据排序之后,在top()函数中进行排序时,会根据key进行Shuffle操作,并得到根据key排序和分区之后的新RDD,所以导致最后的结果跟预期的不一致。
只要肯花时间,自己的潜力还是可以被挖掘出来的!
Spark取出(Key,Value)型数据中Value值为前n条数据相关推荐
- 获取DataTable前几条数据
#region 获取DataTable前几条数据/// <summary>/// 获取DataTable前几条数据/// </summary>/// <param nam ...
- java list 前100个_实现java 中 list集合中有几十万条数据,每100条为一组取出
解决"java 中 list集合中有几十万条数据,每100条为一组取出来如何实现,求代码!!!"的问题. 具体解决方案如下: /** * 实现java 中 list集合中有几十万条 ...
- mysql查询每个id的前10条数据_解决 MySQL 比如我要拉取一个消息表中用户id为1的前10条最新数据...
我们都知道,各种主流的社交应用或者阅读应用,基本都有列表类视图,并且都有滑到底部加载更多这一功能, 对应后端就是分页拉取数据. 好处不言而喻,一般来说,这些数据项都是按时间倒序排列的,用户只关心最新的 ...
- java 获取json的值_Java如何获取JSON数据中的值
场景:在接口自动化场景中,下个接口发送的请求参数,依赖上个接口请求结果中的值.需要将获取值作为全局参数引用. import java.io.File; import java.io.FileInput ...
- Java如何获取JSON数据中的值 备忘
Java如何获取JSON数据中的值 取出JsonArray中的object(orderNo) 嵌套 JsonObeject { JsonObeject { JsonArray [Ob ...
- replace函数对dataframe中的值进行替换(所有数据列中的相同值)
replace函数对dataframe中的值进行替换(所有数据列中的相同值) 目录 replace函数对dataframe中的值进行替换(所有数据列中的相同值)
- 【SpringBoot项目中使用Mybatis批量插入百万条数据】
SpringBoot项目中使用Mybatis批量插入百万条数据 话不多说,直接上代码,测试原生批处理的效率 开始测试 背景:因为一些业务问题,需要做多数据源,多库批量查询.插入操作,所以就研究了一下. ...
- 6-1 简单快速排序分数 10作者 唐艳琴单位 中国人民解放军陆军工程大学本题要求实现一个函数,可快速查找给定x(保证是整个数据中其值存在,如果x有多个,查找第一个x)在整个数据中的排名(数据
6-1 简单快速排序 分数 10 全屏浏览题目 切换布局 作者 唐艳琴 单位 中国人民解放军陆军工程大学 本题要求实现一个函数,可快速查找给定x(保证是整个数据中其值存在,如果x有多个,查找第一个x) ...
- Vue父组件传子组件数据中,Vue监听不到数据改变
Vue父组件传子组件数据中,Vue监听不到数据改变 官方文档说明(引用来自官网) 检测变化的注意事项 由于 JavaScript 的限制,Vue 不能检测数组和对象的变化.尽管如此我们还是有一些办法来 ...
- mysql取分组数据中每个分组的最新一条数据
mysql取分组数据中每个分组的最新一条数据 select * from data td,(select max(id) id from log group by name) md where td. ...
最新文章
- Android --- 命名规范
- java mysql自动备份_java定时备份数据之二_MySQL
- eclipse中YAML文件编辑插件:Yaml Editor插件安装
- apache2.4.9 开启path_info访问_如何通过SSH访问NAS?
- 算法系列之选择排序算法
- 类型混淆漏洞实例浅析
- linux自动点击软件上的按钮有什么用,教会你Linux Shell自动交互的三种方法
- Json扩展 (转)
- 快递公司type字典
- MATLAB彩色图像处理
- oracle vm server for x86,扛起Xen的大旗!体验Oracle VM Server 3.4.6与基于OL7.7的OVM Manager...
- python用爬虫实现抢票_PythonGUI+爬虫-从零打造12306抢票软件价值1680元
- 抖音开发 发布内容至抖音H5
- windows专业版升级企业版
- 微信小程序商城搭建二手交易网站购物+后台管理系统|前后分离VUE.js
- Python:实现double factorial iterative双阶乘迭代算法(附完整源码)
- Logistic回归----葡萄酒案例
- 机器学习之自然语言处理——基于TfidfVectorizer和CountVectorizer及word2vec构建词向量矩阵(代码+原理)
- sql分组查询的使用
- 今年春晚不一样,XR技术如何打造移步换景