使用Spark core和SparkSQL的窗口函数分别实现分组取topN的操作
在spark 1.4及以上版本中,针对sparkSQL,添加了很多新的函数,进一步扩展了SparkSQL对数据的处理能力。
本篇介绍一个强大的窗口函数 row_number()函数,常用于对数据进行分组并取每个分组中的TopN数据。
示例数据如下:
class1 90
class2 56
class1 87
class1 76
class2 88
class1 95
class1 74
class2 87
class2 67
class2 77
1、直接使用Spark core中的api来实现分组取topN功能:
首先将数据源读入代JavaRDD中,然后解析每一行数据,将每一行的第一个元素作为key,第二元素作为value构成tuple的RDD
SparkConf conf = new SparkConf().setAppName("groupTopN").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("C:\\Temp\\groupTopN.txt");
JavaPairRDD<String,Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
@Overridepublic Tuple2<String, Integer> call(String line) throws Exception {
String[] lineSplited = line.split(" ");
return new Tuple2<String,Integer>(lineSplited[0],Integer.valueOf(lineSplited[1]));
}
} );
得到pairs是一个二元组的RDD,直接调用groupByKey()函数,就可以按照key来进行分组了
JavaPairRDD<String, Iterable<Integer>> grouped = pairs.groupByKey();
分组后每个key对应的这一个value的集合,这里,需要对每个key对应的value集合首先进行排序,然后取其前N个元素即可
JavaPairRDD<String,Iterable> groupedTopN = grouped.mapToPair(new PairFunction<Tuple2<String,Iterable<Integer>>, String, Iterable>() { @Override
public Tuple2<String, Iterable> call(Tuple2<String, Iterable<Integer>> values) throws Exception {
Iterator<Integer> iter = values._2.iterator();
List<Integer> list = new ArrayList<Integer>();
while(iter.hasNext()){
list.add(iter.next());
}
//将list中的元素排序
list.sort(new Comparator<Integer>() {
@Override
public int compare(Integer t1, Integer t2) {
int i1 = t1;
int i2 = t2;
return -(i1 - i2);//逆序排列
}
});
List<Integer> top3 = list.subList(0, 3);//直接去前3个元素
return new Tuple2<String,Iterable>(values._1,top3);
}
});
为了便于验证,直接咋本地进行测试,并打印显示
groupedTopN.foreach(new VoidFunction<Tuple2<String,Iterable>>() {
@Override
public void call(Tuple2<String, Iterable> t)
throws Exception {
System.out.println(t._1);
Iterator iterator = t._2.iterator();
while(iterator.hasNext()){
System.out.println(iterator.next());
}
System.out.println("====华丽的分割线=======");
}});
2、使用SparkSQL的窗口函数来时上同样的功能
思路:
窗口函数是HiveSQL中特有的,因此,首先将数据导入到hive表中,然后映射到Spark的DataFrame,在sql语句中直接调用窗口函数即可实现该功能
首先,直接在HiveSQL中创建对应的hive表,然后导入本地数据到hive表中
SparkConf conf = new SparkConf().setAppName("WindowFunctionTopN").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
HiveContext hiveContext = new HiveContext(sc.sc());
//将数据导入到hive表中
hiveContext.sql("DROP TABLE IF EXISTS class_info");
hiveContext.sql("CREATE TABLE IF NOT EXISTS class_info (" + "class STRING," + "score INT");
hiveContext.sql("LOAD DATA " + "LOCAL INPATH '/cqt/testdata/groupTopN.txt' " + "INTO TABLE class_info");
然后,直接调用窗口函数row_number(),注意窗口函数的调用语法
<code class="hljs sql has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;">DataFrame tom3DF = hiveContext.sql("<span class="hljs-operator" style="box-sizing: border-box;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">select</span> class,score <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">from</span><span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">" +"</span>(<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">select</span> class,score,<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"+ "</span>row_number() OVER (PARTITION <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">BY</span> class <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">ORDER</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">BY</span> score <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">DESC</span>) rank <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">from</span> class_info) tmp <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">where</span> rank<=<span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">3</span><span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">");</span></span></code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li></li></ul>
将得到的数据回写到hive表中保存即可
// 将每组排名前3的数据,保存到一个表中
hiveContext.sql("DROP TABLE IF EXISTS grouped_top3"); tom3DF.saveAsTable("grouped_top3");
至此,代码,编写完毕,相比于第一种方式,代码清爽很多!
使用Spark core和SparkSQL的窗口函数分别实现分组取topN的操作相关推荐
- Spark分组取TopN
前言 说到分组,我们很快就想到group by,但是如果在分组的基础上进行取TopN,我们很快又想到开窗函数,group by一般和聚合函数搭配使用,那么聚合函数和开窗函数有啥区别呢? 普通的聚合函数 ...
- SparkSQL 与 Spark Core的关系
不多说,直接上干货! SparkSQL 与 Spark Core的关系 Spark SQL构建在Spark Core之上,专门用来处理结构化数据(不仅仅是SQL). Spark SQL在Spark C ...
- sparkcore分区_Spark学习:Spark源码和调优简介 Spark Core (二)
本文基于 Spark 2.4.4 版本的源码,试图分析其 Core 模块的部分实现原理,其中如有错误,请指正.为了简化论述,将部分细节放到了源码中作为注释,因此正文中是主要内容. 第一部分内容见: S ...
- Spark源码和调优简介 Spark Core
作者:calvinrzluo,腾讯 IEG 后台开发工程师 本文基于 Spark 2.4.4 版本的源码,试图分析其 Core 模块的部分实现原理,其中如有错误,请指正.为了简化论述,将部分细节放到了 ...
- Spark Core:Scala单词计数
Spark Core:Scala单词计数 文章目录 Spark Core:Scala单词计数 1.实验描述 2.实验环境 3.相关技能 4.知识点 5.实验效果 6.实验步骤 8.总结 1.实验描述 ...
- spark core、spark sql、spark streaming 联系与区别
SparkCore 是做离线批处理 SparkSql 是做sql高级查询 SparkStreaming是做流式处理 SparkShell 是做交互式查询 区别: Spark Core : Spark的 ...
- spark core面试专题
1.Spark是什么? Spark是大数据的调度,监控和分配引擎.它是一个快速通用的集群计算平台.Spark扩展了流行的MapReduce模型.Spark提供的主要功能之一就是能够在内存中运行计算 , ...
- 2021年大数据Spark(十八):Spark Core的RDD Checkpoint
目录 RDD Checkpoint 引入 API 代码演示 总结:持久化和Checkpoint的区别 问题: 答案: 区别: RDD Checkpoint 引入 RDD 数据可以持久化,但是持久化/缓 ...
- rdd数据存内存 数据量_spark系列:spark core 数据交互技术点(数据模型)
spark core实现了spark的基本功能:存储交互.任务调度.内存管理.错误恢复等:本片文章主要介绍与数据交互相关的核心技术点. 本文目录: RDD特性及交互 shuffle操作及调优 RDD持 ...
- 分布式实时计算—Spark—Spark Core
原文作者:bingoabin 原文地址:Spark Core 目录 一.Spark Core 1. 主要功能 2. Spark Core子框架 3. Spark架构 4. Spark计算模型 二.组件 ...
最新文章
- 机器学习入门(06)— 输出层多元分类、softmax 归一化指数函数以及输出层的神经元数量
- vant组件实现上传图片裁剪_如何用 120 行代码,实现交互完整的拖拽上传组件?...
- 啊里大鱼短信发送API
- 集成学习(ensemble learning)(四)
- webservice jsonp格式调用
- 7年老Android一次操蛋的面试经历,讲的太透彻了
- 交换两个数组 差最小 java_如何交换两个等长整形数组使其数组和的差最小(C和java实现)...
- @Autowired和@Resource
- XML文档中的xmlns、xmlns:xsi和xsi:schemaLocation
- Unknown initial character set index ‘255‘ received from server.
- 热烈庆祝blog开通
- Linux网卡驱动(4)—DM9000网卡驱动程序完全分析
- TorchScript (将动态图转为静态图)(模型部署)(jit)(torch.jit.trace)(torch.jit.script)
- Unity3d开发MOBA游戏类《王者荣耀》记录(一)
- TFmini和TFmini-Plus——激光雷达模组 资料汇总
- 550什么意思_我给女孩子发520,她回550是什么意思?求各路大神指点?
- dataframe交换某两行(多行)的数据
- 【iOS与EV3混合机器人编程系列之三】编写EV3 Port Viewer 应用监测EV3端口数据
- 场景demo落地 - 视频通话产品 2.0 ARCall
- Galera Cluste(mysql主从复制)