在spark 1.4及以上版本中,针对sparkSQL,添加了很多新的函数,进一步扩展了SparkSQL对数据的处理能力。

本篇介绍一个强大的窗口函数 row_number()函数,常用于对数据进行分组并取每个分组中的TopN数据。

示例数据如下:

class1 90

class2 56

class1 87

class1 76

class2 88

class1 95

class1 74

class2 87

class2 67

class2 77

1、直接使用Spark core中的api来实现分组取topN功能: 
首先将数据源读入代JavaRDD中,然后解析每一行数据,将每一行的第一个元素作为key,第二元素作为value构成tuple的RDD

SparkConf conf = new SparkConf().setAppName("groupTopN").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("C:\\Temp\\groupTopN.txt");
JavaPairRDD<String,Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
@Overridepublic Tuple2<String, Integer> call(String line) throws Exception { 
   String[] lineSplited = line.split(" ");   
   return new Tuple2<String,Integer>(lineSplited[0],Integer.valueOf(lineSplited[1]));
   }
   } );

得到pairs是一个二元组的RDD,直接调用groupByKey()函数,就可以按照key来进行分组了

JavaPairRDD<String, Iterable<Integer>> grouped = pairs.groupByKey();

分组后每个key对应的这一个value的集合,这里,需要对每个key对应的value集合首先进行排序,然后取其前N个元素即可

JavaPairRDD<String,Iterable> groupedTopN = grouped.mapToPair(new PairFunction<Tuple2<String,Iterable<Integer>>, String, Iterable>() {        @Override
        public Tuple2<String, Iterable> call(Tuple2<String, Iterable<Integer>> values) throws Exception {
            Iterator<Integer> iter = values._2.iterator();
            List<Integer> list = new ArrayList<Integer>();
            while(iter.hasNext()){
                list.add(iter.next());
            }
            //将list中的元素排序
            list.sort(new Comparator<Integer>() {
                @Override
                public int compare(Integer t1, Integer t2) {
                    int i1 = t1;
                    int i2 = t2;
                    return -(i1 - i2);//逆序排列
                }
            });

List<Integer> top3 = list.subList(0, 3);//直接去前3个元素
            return new Tuple2<String,Iterable>(values._1,top3);
        }
    });

为了便于验证,直接咋本地进行测试,并打印显示

groupedTopN.foreach(new VoidFunction<Tuple2<String,Iterable>>() { 
   @Override    
   public void call(Tuple2<String, Iterable> t) 
   throws Exception {
   System.out.println(t._1); 
   Iterator iterator = t._2.iterator();    
   while(iterator.hasNext()){  
   System.out.println(iterator.next()); 
   }        
   System.out.println("====华丽的分割线======="); 
   }});

2、使用SparkSQL的窗口函数来时上同样的功能

思路: 
窗口函数是HiveSQL中特有的,因此,首先将数据导入到hive表中,然后映射到Spark的DataFrame,在sql语句中直接调用窗口函数即可实现该功能

首先,直接在HiveSQL中创建对应的hive表,然后导入本地数据到hive表中

SparkConf conf = new SparkConf().setAppName("WindowFunctionTopN").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
HiveContext hiveContext = new HiveContext(sc.sc());
//将数据导入到hive表中
hiveContext.sql("DROP TABLE IF EXISTS class_info");
hiveContext.sql("CREATE TABLE IF NOT EXISTS class_info ("        + "class STRING,"        + "score INT");
hiveContext.sql("LOAD DATA "        + "LOCAL INPATH '/cqt/testdata/groupTopN.txt' "        + "INTO TABLE class_info");

然后,直接调用窗口函数row_number(),注意窗口函数的调用语法

<code class="hljs sql has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;">DataFrame tom3DF = hiveContext.sql("<span class="hljs-operator" style="box-sizing: border-box;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">select</span> class,score <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">from</span><span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">" +"</span>(<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">select</span> class,score,<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"+ "</span>row_number() OVER (PARTITION <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">BY</span> class <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">ORDER</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">BY</span> score <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">DESC</span>) rank <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">from</span> class_info) tmp <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">where</span> rank<=<span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">3</span><span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">");</span></span></code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li></li></ul>

将得到的数据回写到hive表中保存即可

// 将每组排名前3的数据,保存到一个表中

hiveContext.sql("DROP TABLE IF EXISTS grouped_top3"); tom3DF.saveAsTable("grouped_top3");

至此,代码,编写完毕,相比于第一种方式,代码清爽很多!

使用Spark core和SparkSQL的窗口函数分别实现分组取topN的操作相关推荐

  1. Spark分组取TopN

    前言 说到分组,我们很快就想到group by,但是如果在分组的基础上进行取TopN,我们很快又想到开窗函数,group by一般和聚合函数搭配使用,那么聚合函数和开窗函数有啥区别呢? 普通的聚合函数 ...

  2. SparkSQL 与 Spark Core的关系

    不多说,直接上干货! SparkSQL 与 Spark Core的关系 Spark SQL构建在Spark Core之上,专门用来处理结构化数据(不仅仅是SQL). Spark SQL在Spark C ...

  3. sparkcore分区_Spark学习:Spark源码和调优简介 Spark Core (二)

    本文基于 Spark 2.4.4 版本的源码,试图分析其 Core 模块的部分实现原理,其中如有错误,请指正.为了简化论述,将部分细节放到了源码中作为注释,因此正文中是主要内容. 第一部分内容见: S ...

  4. Spark源码和调优简介 Spark Core

    作者:calvinrzluo,腾讯 IEG 后台开发工程师 本文基于 Spark 2.4.4 版本的源码,试图分析其 Core 模块的部分实现原理,其中如有错误,请指正.为了简化论述,将部分细节放到了 ...

  5. Spark Core:Scala单词计数

    Spark Core:Scala单词计数 文章目录 Spark Core:Scala单词计数 1.实验描述 2.实验环境 3.相关技能 4.知识点 5.实验效果 6.实验步骤 8.总结 1.实验描述 ...

  6. spark core、spark sql、spark streaming 联系与区别

    SparkCore 是做离线批处理 SparkSql 是做sql高级查询 SparkStreaming是做流式处理 SparkShell 是做交互式查询 区别: Spark Core : Spark的 ...

  7. spark core面试专题

    1.Spark是什么? Spark是大数据的调度,监控和分配引擎.它是一个快速通用的集群计算平台.Spark扩展了流行的MapReduce模型.Spark提供的主要功能之一就是能够在内存中运行计算 , ...

  8. 2021年大数据Spark(十八):Spark Core的RDD Checkpoint

    目录 RDD Checkpoint 引入 API 代码演示 总结:持久化和Checkpoint的区别 问题: 答案: 区别: RDD Checkpoint 引入 RDD 数据可以持久化,但是持久化/缓 ...

  9. rdd数据存内存 数据量_spark系列:spark core 数据交互技术点(数据模型)

    spark core实现了spark的基本功能:存储交互.任务调度.内存管理.错误恢复等:本片文章主要介绍与数据交互相关的核心技术点. 本文目录: RDD特性及交互 shuffle操作及调优 RDD持 ...

  10. 分布式实时计算—Spark—Spark Core

    原文作者:bingoabin 原文地址:Spark Core 目录 一.Spark Core 1. 主要功能 2. Spark Core子框架 3. Spark架构 4. Spark计算模型 二.组件 ...

最新文章

  1. 机器学习入门(06)— 输出层多元分类、softmax 归一化指数函数以及输出层的神经元数量
  2. vant组件实现上传图片裁剪_如何用 120 行代码,实现交互完整的拖拽上传组件?...
  3. 啊里大鱼短信发送API
  4. 集成学习(ensemble learning)(四)
  5. webservice jsonp格式调用
  6. 7年老Android一次操蛋的面试经历,讲的太透彻了
  7. 交换两个数组 差最小 java_如何交换两个等长整形数组使其数组和的差最小(C和java实现)...
  8. @Autowired和@Resource
  9. XML文档中的xmlns、xmlns:xsi和xsi:schemaLocation
  10. Unknown initial character set index ‘255‘ received from server.
  11. 热烈庆祝blog开通
  12. Linux网卡驱动(4)—DM9000网卡驱动程序完全分析
  13. TorchScript (将动态图转为静态图)(模型部署)(jit)(torch.jit.trace)(torch.jit.script)
  14. Unity3d开发MOBA游戏类《王者荣耀》记录(一)
  15. TFmini和TFmini-Plus——激光雷达模组 资料汇总
  16. 550什么意思_我给女孩子发520,她回550是什么意思?求各路大神指点?
  17. dataframe交换某两行(多行)的数据
  18. 【iOS与EV3混合机器人编程系列之三】编写EV3 Port Viewer 应用监测EV3端口数据
  19. 场景demo落地 - 视频通话产品 2.0 ARCall
  20. Galera Cluste(mysql主从复制)

热门文章

  1. selinux详解及配置文件
  2. LeetCode Student Attendance Record I
  3. Ural1297 最长回文子串(后缀数组+RMQ)
  4. 【如何提取软件图片素材资源】
  5. 跨域请求解决方法(JSONP, CORS)
  6. 【MyBean调试笔记】接口的使用和清理
  7. java提取(获取)博客信息(内容)
  8. JTable表格内容导出execl表
  9. 使用javascript操作cookies的实例
  10. 如何把手机变成你的救生设备