二次排序原理

二次排序就是首先按照第一字段排序,然后再对第一字段相同的行按照第二字段排序,注意不能破坏第一次排序的结果。

二次排序技术

假设对应的Key = K有如下值:

(K,V1), (K,V2),…,(K,Vn)

另外假设每个Vi是包含m个属性的一个元组,如下所示:

(Ai1,Ai2,…,Aim)

在这里我们希望按Ai1对归约器的元组的值进行排序。我们将用R表示元组其余的属性:(Ai2,…,Aim),因此,可以把归约器的值表示为:

(K,(A1,R1)),(K,(A2,R2)),…,(K,(An,Rn))

要按Ai对归约器的值进行排序,那么需要创建一个组合键:(K,Ai),新映射器将发出对应的Key=K的键值对,如下表所示:

(K,A1)

(A1,R1)

(K,A2)

(A2,R2)

(K,An)

(An,Rn)

从上表中不难理解,定义组合键为(K,Ai),自然键为K,通过定义组合键(即为自然键增加属性Ai)。假设二次排序使用MapReduce框架对归约器的值进行排序,按照自然键(K)来完成分区。则自然键和组合键排序图如下所示:

假设有这样的一个二次排序问题的例子:考虑一个科学试验得到的温度数据,这样温度的数据如下所示(各列分别为年,月,日,温度):

2015      1     1     10

2015      1     2     11

2015      1     3     12

2015      1     4     13

2015      2     1     22

2015      2     2     23

2015      2     3     24

2015      2     4     25

2015      3     1     20

2015      3     2     21

2015      3     3     22

2015      3     4     23

假设我们希望输出每一个[年-月]的温度,并且值按升序排序。

Spark的二次排序代码实现

1、自定义排序分区,代码如下:

/*** 自定义排序分区**/
class SortPartitioner(partitions: Int) extends Partitioner {require(partitions > 0, s"分区的数量($partitions)必须大于零。")def numPartitions: Int = partitionsdef getPartition(key: Any): Int = key match {case (k: String, v: Int) => math.abs(k.hashCode % numPartitions)case null => 0case _ => math.abs(key.hashCode % numPartitions)}override def equals(other: Any): Boolean = other match {case o: SortPartitioner => o.numPartitions == numPartitionscase _ => false}override def hashCode: Int = numPartitions
}

2、二次排序代码实现

/*** Spark的二次排序**/
object SparkSecondarySort {def main(args: Array[String]): Unit = {if (args.length != 3) {println("输入参数<分区数> <输入路径> <输出路径>不正确")sys.exit(1)}//分区数量val partitions: Int = args(0).toInt//文件输入路径val inputPath: String = args(1)//文件输出路径val outputPath: String = args(2)val config: SparkConf = new SparkConf()config.setMaster("local[1]").setAppName("SparkSecondarySort")//创建Spark上下文val sc: SparkContext = SparkSession.builder().config(config).getOrCreate().sparkContext//读取文件内容val input: RDD[String] = sc.textFile(inputPath)val valueToKey: RDD[((String, Int), Int)] = input.map(x => {val line: Array[String] = x.split("\t")((line(0) + "-" + line(1), line(3).toInt), line(3).toInt)})implicit def tupleOrderingDesc = new Ordering[Tuple2[String, Int]] {override def compare(x: Tuple2[String, Int], y: Tuple2[String, Int]): Int = {if (y._1.compare(x._1) == 0) -y._2.compare(x._2)else -y._1.compare(x._1)}}val sorted: RDD[((String, Int), Int)] = valueToKey.repartitionAndSortWithinPartitions(new SortPartitioner(partitions))val result = sorted.map {case (k, v) => (k._1, v.toString())}.reduceByKey(_ + "," + _)result.saveAsTextFile(outputPath)// donesc.stop()}
}

运行结果:

(2015-1,5,6,7,8,9,10,10,11,11,12,12,13,13,14,14,15,15,16,16,17,17,18,18,19,19,20,20,21,21,22,22)
(2015-3,18,19,20,20,20,21,21,21,22,22,22,23,23,23,24,24,24,25,25,25,26,26,26,27,27,27,28,28,28,29,30)
(2015-2,12,13,14,15,16,17,18,19,20,21,22,22,23,23,24,24,25,25,26,26,27,28,29,30,30,30,31,32)

[Spark的二次排序的实现]相关推荐

  1. Spark学习之路(八):分别使用Java与Scala实现Spark二次排序

    内容简介 一.Spark二次排序的概念 二.实现二次排序的详细步骤(Java语言) 三.二次排序代码演示 1.Java版本 2.Scala版本 四.总结 一.Spark二次排序的概念 排序操作是数据处 ...

  2. spark:sortByKey实现二次排序

    最近在项目中遇到二次排序的需求,和平常开发spark的application一样,开始查看API,编码,调试,验证结果.由于之前对spark的API使用过,知道API中的sortByKey()可以自定 ...

  3. 《spark技术应用》课程期末考试大作业报告,使用eclipse完成求top值、文件排序、二次排序三个程序的个性化开发。

    ​​​​目录 一.选题的目的及要求... 4 二.设计思路... 4 三.主要内容及关键技术.. 5 四.制作步骤... 5 1.准备工作... 5 1.1在VMware中安装一台Ubuntu64位系 ...

  4. 数据算法——Spark二次排序

    1.Scala实现: /*** 二次排序:超过2列(特征)* 对比MR天气案例,自定义一个key(包含读到的一行数字),对key进行内部比较.*/ object SecondSort {def mai ...

  5. 【大数据分析常用算法】1.二次排序

    2019独角兽企业重金招聘Python工程师标准>>> 简介 本算法教程系列建立在您已经有了spark以及Hadoop的开发基础,如果没有的话,请观看本博客的hadoop相关教程或者 ...

  6. 详细讲解MapReduce二次排序过程

    2019独角兽企业重金招聘Python工程师标准>>> 我在15年处理大数据的时候还都是使用MapReduce, 随着时间的推移, 计算工具的发展, 内存越来越便宜, 计算方式也有了 ...

  7. java二次排序_使用java 实现二次排序

    二次排序工具类: import java.io.Serializable; import scala.math.Ordered; /** * @author 作者 E-mail: * @version ...

  8. 大数据【四】MapReduce(单词计数;二次排序;计数器;join;分布式缓存)

       前言: 根据前面的几篇博客学习,现在可以进行MapReduce学习了.本篇博客首先阐述了MapReduce的概念及使用原理,其次直接从五个实验中实践学习(单词计数,二次排序,计数器,join,分 ...

  9. hadoop之MapReduce自定义二次排序流程实例详解

    一.概述 MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求,但是也是十分有限的.在我们实际的需求当中,往往有要对reduce输出结果进行二次排序的需求 ...

最新文章

  1. C语言字符串操作函数
  2. 【C++】21.函数传参 传指针和传引用的区别
  3. 深入学习c++--智能指针(三) unique_ptr
  4. 转载 maven 详解 http://www.cnblogs.com/binyue/p/4729134.html
  5. git 合并冲突_git分支管理的策略和冲突问题
  6. vmware安装minimal centos报错/etc/rc5.d/s99local : line
  7. html 缩略图点击预览,jQuery图片相册点击缩略图弹出大图预览特效
  8. UE4与WEB服务器交互(json)
  9. AcWing基础算法课Level-2 第四讲 数学知识
  10. Spark笔记整理(一):spark单机安装部署、分布式集群与HA安装部署+spark源码编译...
  11. 算法:找出相同字母组成的字符串Group Anagrams
  12. 浏览器自动打开html怎么办,浏览器自动弹出网页怎么处理?开机自动弹出垃圾网页如何解决?...
  13. 怎么白嫖一部好手机?我来告诉你
  14. LeetCode K站中转内最便宜的航班(回溯法、动态规划)
  15. 酪氨酸激酶的抑制剂——白血病的靶向研究
  16. Vue整合Markdown组件+SpringBoot文件上传+代码差异对比
  17. Python 调用WebService接口出错-suds.transport.TransportError: HTTP Error 401: Unauthori
  18. 【步兵 cocos2dx】热更新(下)
  19. 自动化学报Ctex+texstudio配置方法
  20. 如何看待药物治疗过程中的副作用?

热门文章

  1. 2020优必选算法岗现场面(凉经)
  2. GAT-图注意力模型
  3. 关于poi word 文档生成的那些坑坑洼洼
  4. 本地数据库同步到云主机上
  5. microk8s helm2 Error: no available release name found
  6. Spark 场景题详解
  7. 智慧城市建设 这五个方面不可不考虑
  8. 【好记性不如烂笔头】记一次奇怪的“找不到符号”问题
  9. 【iOS】代码开发中bug管理:bugHD和bugtags
  10. maya要学python吗_Day1 为什么要学Python?