最近在项目中遇到二次排序的需求,和平常开发spark的application一样,开始查看API,编码,调试,验证结果。由于之前对spark的API使用过,知道API中的sortByKey()可以自定义排序规则,通过实现自定义的排序规则来实现二次排序。 
这里为了说明问题,举了一个简单的例子,key是由两部分组成的,我们这里按key的第一部分的降序排,key的第二部分升序排,具体如下:

JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);final Random random = new Random(100);JavaPairRDD javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, String, Integer>() {    @Override    public Tuple2<String, Integer> call(Integer integer) throws Exception {        return new Tuple2<String, Integer>(Integer.toString(integer) + " " + random.nextInt(10),random.nextInt(10));   }
});JavaPairRDD<String,Integer> sortByKeyRDD = javaPairRDD.sortByKey(new Comparator<String>() {    @Override    public int compare(String o1, String o2) {        String []o1s = o1.split(" ");        String []o2s = o2.split(" ");       if(o1s[0].compareTo(o2s[0]) == 0)            return o1s[1].compareTo(o2s[1]);        else            return -o1s[0].compareTo(o2s[0]);    }
});
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + sortByKeyRDD.collect());

上面编码从语法上没有什么问题,可是运行下报了如下错误:

java.lang.reflect.InvocationTargetExceptionat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:606)at org.apache.spark.serializer.SerializationDebugger$ObjectStreamClassMethods$.getObjFieldValues$extension(SerializationDebugger.scala:248)at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:158)at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:107)at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:166)at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:107)at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:166)at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:107)at org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugger.scala:66)at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)at org.apache.spark.SparkContext.runJob(SparkContext.scala:1764)at org.apache.spark.SparkContext.runJob(SparkContext.scala:1779)at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:885)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)at org.apache.spark.rdd.RDD.collect(RDD.scala:884)at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:335)at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:47)

因此,我再次去查看相应的spark Java API文档,但是我没有发现任何指明错误的地方。好吧,那只能扒下源码吧,在javaPairRDD中

def sortByKey(comp: Comparator[K], ascending: Boolean): JavaPairRDD[K, V] = {  implicit val ordering = comp // Allow implicit conversion of Comparator to Ordering.  fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending))
}

其实在OrderedRDDFunctions类中有个变量ordering它是隐形的:private val ordering = implicitly[Ordering[K]]。他就是默认的排序规则,我们自己重写的comp就修改了默认的排序规则。到这里还是没有发现问题,但是发现类OrderedRDDFunctions extends Logging with Serializable,又回到上面的报错信息,扫描到“serializable”!!!因此,返回上述代码,查看Comparator interface实现,发现原来是它没有extend Serializable,故只需创建一个 serializable的comparator就可以:public interface SerializableComparator<T> extends Comparator<T>, Serializable { }. 
具体如下:

private static class Comp implements Comparator<String>,Serializable{    @Override    public int compare(String o1, String o2) {            String []o1s = o1.split(" ");            String []o2s = o2.split(" ");            if(o1s[0].compareTo(o2s[0]) == 0)                return o1s[1].compareTo(o2s[1]);elsereturn -o1s[0].compareTo(o2s[0]);    }
}
JavaPairRDD<String,Integer> sortByKeyRDD = javaPairRDD.sortByKey(new Comp());

总结下,在spark的Java API中,如果需要使用Comparator接口,须注意是否需要序列化,如sortByKey(),repartitionAndSortWithinPartitions()等都是需要序列化的。

spark:sortByKey实现二次排序相关推荐

  1. 《spark技术应用》课程期末考试大作业报告,使用eclipse完成求top值、文件排序、二次排序三个程序的个性化开发。

    ​​​​目录 一.选题的目的及要求... 4 二.设计思路... 4 三.主要内容及关键技术.. 5 四.制作步骤... 5 1.准备工作... 5 1.1在VMware中安装一台Ubuntu64位系 ...

  2. Spark学习之路(八):分别使用Java与Scala实现Spark二次排序

    内容简介 一.Spark二次排序的概念 二.实现二次排序的详细步骤(Java语言) 三.二次排序代码演示 1.Java版本 2.Scala版本 四.总结 一.Spark二次排序的概念 排序操作是数据处 ...

  3. 数据算法——Spark二次排序

    1.Scala实现: /*** 二次排序:超过2列(特征)* 对比MR天气案例,自定义一个key(包含读到的一行数字),对key进行内部比较.*/ object SecondSort {def mai ...

  4. [Spark的二次排序的实现]

    二次排序原理 二次排序就是首先按照第一字段排序,然后再对第一字段相同的行按照第二字段排序,注意不能破坏第一次排序的结果. 二次排序技术 假设对应的Key = K有如下值: (K,V1), (K,V2) ...

  5. java二次排序_使用java 实现二次排序

    二次排序工具类: import java.io.Serializable; import scala.math.Ordered; /** * @author 作者 E-mail: * @version ...

  6. Kafka与Spark集成系列二Spark的安装及简单应用

    原 [Kafka与Spark集成系列二] Spark的安装及简单应用https://blog.csdn.net/u013256816/article/details/82082019版权声明:本文为博 ...

  7. Python+大数据-Spark技术栈(二)SparkBaseCore

    Python+大数据-Spark技术栈(二)SparkBase&Core 学习目标 掌握SparkOnYarn搭建 掌握RDD的基础创建及相关算子操作 了解PySpark的架构及角色 环境搭建 ...

  8. 【大数据分析常用算法】1.二次排序

    2019独角兽企业重金招聘Python工程师标准>>> 简介 本算法教程系列建立在您已经有了spark以及Hadoop的开发基础,如果没有的话,请观看本博客的hadoop相关教程或者 ...

  9. 详细讲解MapReduce二次排序过程

    2019独角兽企业重金招聘Python工程师标准>>> 我在15年处理大数据的时候还都是使用MapReduce, 随着时间的推移, 计算工具的发展, 内存越来越便宜, 计算方式也有了 ...

最新文章

  1. Gradle dependency cache may be corrupt
  2. 使用Maven管理Java项目
  3. 聚类的评价方法 轮廓系数
  4. redis学习(七)jedis客户端
  5. Java学习从入门到精通(1) [转载]
  6. Netty--ChannelHandler和ChannelPipeline
  7. totolinkn200up怎么设置_totolinkN200R无线路由器如何设置啊,求高人指点
  8. Leetcode--837. 新21点(java)
  9. 发际线预警!10本程序员必读烧脑经典,你敢挑战一本吗?
  10. Android学习点点滴滴之获取正在运行的进程
  11. 多线程、多核技术是什么技术?
  12. 2019web前端开发视频教程资料(汇总整理)
  13. 推荐几款大家常使用的 SSH 客户端工具
  14. win7下配置python_win7如何配置Python环境变量
  15. Spark Shuffle之Tungsten-Sort
  16. 微积分小糊涂,国庆节大快乐!
  17. 新手如何成为一名黑客
  18. ElasticSeach 监控之cerebro
  19. esp32 鸿蒙,ESP8266最佳开发板--ESP-LAUNCHER开发板评测
  20. 域名和URL是什么?域名和IP是一一对应的吗?

热门文章

  1. oracle转成整型_Oracle中如何用SQL把字符串转换成整型
  2. php 接口说明文档,phpwind文章中心接口说明
  3. Apache连接和访问控制
  4. Python中的数值类型
  5. opencv轻松入门面向python下载_OpenCV轻松入门:面向Python
  6. php上个月的最后一天,在PHP中查找上个月的最后一天
  7. windows和linux共同分区格式,Linux和Windows共存的模式下分区要小心
  8. redis ubuntu php 5.2,ubuntu 14.04下简易安装php5.5 + apache2 + redis + mysql
  9. python pillow环境_Python环境Pillow( PIL )图像处理工具使用解析
  10. data-index在react里怎样表达_如何自我训练,提高表达能力