[Spark的二次排序的实现]
二次排序原理
二次排序就是首先按照第一字段排序,然后再对第一字段相同的行按照第二字段排序,注意不能破坏第一次排序的结果。
二次排序技术
假设对应的Key = K有如下值:
(K,V1), (K,V2),…,(K,Vn)
另外假设每个Vi是包含m个属性的一个元组,如下所示:
(Ai1,Ai2,…,Aim)
在这里我们希望按Ai1对归约器的元组的值进行排序。我们将用R表示元组其余的属性:(Ai2,…,Aim),因此,可以把归约器的值表示为:
(K,(A1,R1)),(K,(A2,R2)),…,(K,(An,Rn))
要按Ai对归约器的值进行排序,那么需要创建一个组合键:(K,Ai),新映射器将发出对应的Key=K的键值对,如下表所示:
键 |
值 |
(K,A1) |
(A1,R1) |
(K,A2) |
(A2,R2) |
… |
… |
(K,An) |
(An,Rn) |
从上表中不难理解,定义组合键为(K,Ai),自然键为K,通过定义组合键(即为自然键增加属性Ai)。假设二次排序使用MapReduce框架对归约器的值进行排序,按照自然键(K)来完成分区。则自然键和组合键排序图如下所示:
假设有这样的一个二次排序问题的例子:考虑一个科学试验得到的温度数据,这样温度的数据如下所示(各列分别为年,月,日,温度):
2015 1 1 10
2015 1 2 11
2015 1 3 12
2015 1 4 13
…
2015 2 1 22
2015 2 2 23
2015 2 3 24
2015 2 4 25
…
2015 3 1 20
2015 3 2 21
2015 3 3 22
2015 3 4 23
假设我们希望输出每一个[年-月]的温度,并且值按升序排序。
Spark的二次排序代码实现
1、自定义排序分区,代码如下:
/*** 自定义排序分区**/
class SortPartitioner(partitions: Int) extends Partitioner {require(partitions > 0, s"分区的数量($partitions)必须大于零。")def numPartitions: Int = partitionsdef getPartition(key: Any): Int = key match {case (k: String, v: Int) => math.abs(k.hashCode % numPartitions)case null => 0case _ => math.abs(key.hashCode % numPartitions)}override def equals(other: Any): Boolean = other match {case o: SortPartitioner => o.numPartitions == numPartitionscase _ => false}override def hashCode: Int = numPartitions
}
2、二次排序代码实现
/*** Spark的二次排序**/
object SparkSecondarySort {def main(args: Array[String]): Unit = {if (args.length != 3) {println("输入参数<分区数> <输入路径> <输出路径>不正确")sys.exit(1)}//分区数量val partitions: Int = args(0).toInt//文件输入路径val inputPath: String = args(1)//文件输出路径val outputPath: String = args(2)val config: SparkConf = new SparkConf()config.setMaster("local[1]").setAppName("SparkSecondarySort")//创建Spark上下文val sc: SparkContext = SparkSession.builder().config(config).getOrCreate().sparkContext//读取文件内容val input: RDD[String] = sc.textFile(inputPath)val valueToKey: RDD[((String, Int), Int)] = input.map(x => {val line: Array[String] = x.split("\t")((line(0) + "-" + line(1), line(3).toInt), line(3).toInt)})implicit def tupleOrderingDesc = new Ordering[Tuple2[String, Int]] {override def compare(x: Tuple2[String, Int], y: Tuple2[String, Int]): Int = {if (y._1.compare(x._1) == 0) -y._2.compare(x._2)else -y._1.compare(x._1)}}val sorted: RDD[((String, Int), Int)] = valueToKey.repartitionAndSortWithinPartitions(new SortPartitioner(partitions))val result = sorted.map {case (k, v) => (k._1, v.toString())}.reduceByKey(_ + "," + _)result.saveAsTextFile(outputPath)// donesc.stop()}
}
运行结果:
(2015-1,5,6,7,8,9,10,10,11,11,12,12,13,13,14,14,15,15,16,16,17,17,18,18,19,19,20,20,21,21,22,22)
(2015-3,18,19,20,20,20,21,21,21,22,22,22,23,23,23,24,24,24,25,25,25,26,26,26,27,27,27,28,28,28,29,30)
(2015-2,12,13,14,15,16,17,18,19,20,21,22,22,23,23,24,24,25,25,26,26,27,28,29,30,30,30,31,32)
[Spark的二次排序的实现]相关推荐
- Spark学习之路(八):分别使用Java与Scala实现Spark二次排序
内容简介 一.Spark二次排序的概念 二.实现二次排序的详细步骤(Java语言) 三.二次排序代码演示 1.Java版本 2.Scala版本 四.总结 一.Spark二次排序的概念 排序操作是数据处 ...
- spark:sortByKey实现二次排序
最近在项目中遇到二次排序的需求,和平常开发spark的application一样,开始查看API,编码,调试,验证结果.由于之前对spark的API使用过,知道API中的sortByKey()可以自定 ...
- 《spark技术应用》课程期末考试大作业报告,使用eclipse完成求top值、文件排序、二次排序三个程序的个性化开发。
目录 一.选题的目的及要求... 4 二.设计思路... 4 三.主要内容及关键技术.. 5 四.制作步骤... 5 1.准备工作... 5 1.1在VMware中安装一台Ubuntu64位系 ...
- 数据算法——Spark二次排序
1.Scala实现: /*** 二次排序:超过2列(特征)* 对比MR天气案例,自定义一个key(包含读到的一行数字),对key进行内部比较.*/ object SecondSort {def mai ...
- 【大数据分析常用算法】1.二次排序
2019独角兽企业重金招聘Python工程师标准>>> 简介 本算法教程系列建立在您已经有了spark以及Hadoop的开发基础,如果没有的话,请观看本博客的hadoop相关教程或者 ...
- 详细讲解MapReduce二次排序过程
2019独角兽企业重金招聘Python工程师标准>>> 我在15年处理大数据的时候还都是使用MapReduce, 随着时间的推移, 计算工具的发展, 内存越来越便宜, 计算方式也有了 ...
- java二次排序_使用java 实现二次排序
二次排序工具类: import java.io.Serializable; import scala.math.Ordered; /** * @author 作者 E-mail: * @version ...
- 大数据【四】MapReduce(单词计数;二次排序;计数器;join;分布式缓存)
前言: 根据前面的几篇博客学习,现在可以进行MapReduce学习了.本篇博客首先阐述了MapReduce的概念及使用原理,其次直接从五个实验中实践学习(单词计数,二次排序,计数器,join,分 ...
- hadoop之MapReduce自定义二次排序流程实例详解
一.概述 MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求,但是也是十分有限的.在我们实际的需求当中,往往有要对reduce输出结果进行二次排序的需求 ...
最新文章
- C语言字符串操作函数
- 【C++】21.函数传参 传指针和传引用的区别
- 深入学习c++--智能指针(三) unique_ptr
- 转载 maven 详解 http://www.cnblogs.com/binyue/p/4729134.html
- git 合并冲突_git分支管理的策略和冲突问题
- vmware安装minimal centos报错/etc/rc5.d/s99local : line
- html 缩略图点击预览,jQuery图片相册点击缩略图弹出大图预览特效
- UE4与WEB服务器交互(json)
- AcWing基础算法课Level-2 第四讲 数学知识
- Spark笔记整理(一):spark单机安装部署、分布式集群与HA安装部署+spark源码编译...
- 算法:找出相同字母组成的字符串Group Anagrams
- 浏览器自动打开html怎么办,浏览器自动弹出网页怎么处理?开机自动弹出垃圾网页如何解决?...
- 怎么白嫖一部好手机?我来告诉你
- LeetCode K站中转内最便宜的航班(回溯法、动态规划)
- 酪氨酸激酶的抑制剂——白血病的靶向研究
- Vue整合Markdown组件+SpringBoot文件上传+代码差异对比
- Python 调用WebService接口出错-suds.transport.TransportError: HTTP Error 401: Unauthori
- 【步兵 cocos2dx】热更新(下)
- 自动化学报Ctex+texstudio配置方法
- 如何看待药物治疗过程中的副作用?