一、分组求最大值

计算文本里面的每个key分组求最大值,输出结果。

二、maven设置

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.mk</groupId><artifactId>spark-test</artifactId><version>1.0</version><name>spark-test</name><url>http://spark.mk.com</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><scala.version>2.11.1</scala.version><spark.version>2.4.4</spark.version><hadoop.version>2.6.0</hadoop.version></properties><dependencies><!-- scala依赖--><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><!-- spark依赖--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency></dependencies><build><pluginManagement><plugins><plugin><artifactId>maven-clean-plugin</artifactId><version>3.1.0</version></plugin><plugin><artifactId>maven-resources-plugin</artifactId><version>3.0.2</version></plugin><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version></plugin><plugin><artifactId>maven-surefire-plugin</artifactId><version>2.22.1</version></plugin><plugin><artifactId>maven-jar-plugin</artifactId><version>3.0.2</version></plugin></plugins></pluginManagement></build>
</project>

三、编程代码

public class GroupByMaxApp implements SparkConfInfo {public static void main(String[] args) {String filePath = "E:\\spark\\groubByNumber.txt";SparkSession sparkSession = new GroupByMaxApp().getSparkConf("groubByNumber");JavaPairRDD<String, Integer> numbers = sparkSession.sparkContext().textFile(filePath, 4).toJavaRDD().flatMap(v -> Arrays.asList(v.split("\n")).iterator()).mapToPair(v -> {String[] data = v.split("\\s+");if (data.length != 2) {return null;}if (!data[1].matches("-?[0-9]+(.[0-9]+)?"))return null;return new Tuple2<>(data[0], Integer.valueOf(data[1]));}).filter(v -> v != null).cache();//数据量大会溢出内存无法计算
//        numbers.groupByKey()
//                .sortByKey(true)
//                .mapValues(v -> {
//
//                    Integer max = null;
//                    Iterator<Integer> it = v.iterator();
//                    while (it.hasNext()) {
//                        Integer val = it.next();
//                        if(max==null || max<val){
//                            max = val;
//                        }
//                    }
//                    return max;
//                })
//                .collect()
//                .forEach(v -> System.out.println(v._1 + ":" + v._2));//这种聚合数据再计算numbers.combineByKey(max -> max,  // 将val映射为一个元组,作为分区内聚合初始值(max,val) -> {if (max < val) {max = val;}return max;}, //分区内聚合,(a, b) -> Math.max(a, b))   //分区间聚合.sortByKey(true).collect().forEach(v -> System.out.println(v._1 + ":" + v._2));sparkSession.stop();}
}public interface SparkConfInfo {default SparkSession getSparkConf(String appName){SparkConf sparkConf = new SparkConf();if(System.getProperty("os.name").toLowerCase().contains("win")) {sparkConf.setMaster("local[4]");System.out.println("使用本地模拟是spark");}else{sparkConf.setMaster("spark://hadoop01:7077,hadoop02:7077,hadoop03:7077");sparkConf.set("spark.driver.host","192.168.150.1");//本地ip,必须与spark集群能够相互访问,如:同一个局域网sparkConf.setJars(new String[] {".\\out\\artifacts\\spark_test\\spark-test.jar"});//项目构建生成的路径}SparkSession session = SparkSession.builder().appName(appName).config(sparkConf).config(sparkConf).getOrCreate();return session;}
}

groubByNumber.txt文件内容

A 100
A 24
B 43
C 774
D 43
D 37
D 78
E 42
C 68
F 89
G 49
F 543
H 36
E 888
A 258
A 538
B 79
B 6
H 67
C 99

输出

A:538
B:79
C:774
D:78
E:888
F:543
G:49
H:67

四、combineByKey方法

<C> JavaPairRDD<K, C> combineByKey(Function<V, C> createCombiner, Function2<C, V, C> mergeValue, Function2<C, C, C> mergeCombiners);

首先介绍一下上面三个参数:

* Users provide three functions:
*  - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
这个函数把当前的值作为参数,此时我们可以对其做些附加操作(类型转换)并把它返回 (这一步类似于初始化操作)
*  - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
该函数把元素V合并到之前的元素C(createCombiner)上 (这个操作在每个分区内进行)
*  - `mergeCombiners`, to combine two C's into a single one.
该函数把2个元素C合并 (这个操作在不同分区间进行)

Spark入门(十四)之分组求最大值相关推荐

  1. 2021年大数据Spark(十四):Spark Core的RDD操作

    目录 RDD的操作 函数(算子)分类 Transformation函数 ​​​​​​​Action函数 RDD的操作 有一定开发经验的读者应该都使用过多线程,利用多核 CPU 的并行能力来加快运算速率 ...

  2. Spark入门(四)Idea远程提交项目到spark集群

    一.依赖包配置 scala与spark的相关依赖包,spark包后尾下划线的版本数字要跟scala的版本第一二位要一致,即2.11 pom.xml <?xml version="1.0 ...

  3. java怎样用类模板创建对象_java入门(十四) | 面向对象(OOP)之类和对象

    上一期是变量,在java中变量总是无处不在,而变量其意就是可以改变的数,在一般情况下我们可以以变量类型,变量名,变量值来描述它 这一期是给面向对象(OOP)开了一个头,对他的概念,三大特征有了一个基础 ...

  4. 用java求解一元四次方程_Java程序設計(十四)----一個求一元二次方程根

    * 程序的版權和版本聲明部分 * Copyright (c) 2012, 煙台大學計算機學院學生 * All rights reserved. * 作 者: 劉鎮 * 完成日期: 2012 年 11 ...

  5. slam入门——十四讲笔记(一)

    文章目录 第1讲 预备知识 第1部分 数学基础 第2讲 初识SLAM 2.1 引子:小萝卜的例子 2.2 经典视觉SLAM框架 2.3 SLAM问题的数学表述 2.4 实践:编程基础 1. 安装Lin ...

  6. python3行3列文件数据赋值_Python3快速入门(十四)——Pandas数据读取

    Python3快速入门(十四)--Pandas数据读取 一.DataFrame IO 1.CSV文件 pandas.read_csv(filepath_or_buffer, na_values='NA ...

  7. IM开发者的零基础通信技术入门(十二):上网卡顿?网络掉线?一文即懂!

    [来源申明]本文引用了微信公众号"鲜枣课堂"的<上网慢?经常掉线?这篇文章告诉你该怎么办!>文章内容.为了更好的内容呈现,即时通讯网在引用和收录时内容有改动,转载时请注 ...

  8. Spark入门(十六)之分组求TOP N最小值

    一.分组求TOP N最小值 计算文本里面的每个key分组求TOP N最小值,输出结果. 二.maven设置 <?xml version="1.0" encoding=&quo ...

  9. Spark入门(十五)之分组求最小值

    一.分组求最小值 计算文本里面的每个key分组求最小值,输出结果. 二.maven设置 <?xml version="1.0" encoding="UTF-8&qu ...

最新文章

  1. 2015大型互联网公司校招都开始了,薪资你准备好了嘛?
  2. 【系列索引】结合项目实例 回顾传统设计模式 打造属于自己的模式类系列
  3. Django博客系统(用户中心修改)
  4. 使用spdevelop进行数据库建模
  5. Java应用程序中的性能改进:ORM / JPA
  6. Java学习笔记——显示当前日期的三种方式
  7. DevOps笔记-02:DevOps与微服务之间是什么关系?
  8. tf.unstack\tf.unstack
  9. Hadoop之InputFormat数据输入详解
  10. visual studio Code配置C++环境:
  11. CTA策略05_AtrRsiStrategy
  12. 后缀转中缀表达式_中缀转后缀表达式代码实现(下)及计算器完整版
  13. SQL数据分析之数据提取、数据查询、数据清洗【MySQL速查】
  14. fiddler软件抓包工具超详细配置方法
  15. 共享硬盘没有权限访问计算机,win7系统访问磁盘共享没有权限的解决方法
  16. php如何解析QQ音乐,QQ音乐解析接口分享
  17. 02、江苏专转本(专业课笔记)第二章、计算机的组成原理
  18. nginx 配置https 443端口配置
  19. (二)PUN 2基本教程
  20. python数据分析:商品数据化运营(上)——知识点

热门文章

  1. mysql约束深入了解_MySQL 的约束
  2. vs里面mfc是什么_最近!一大批人正在前往文安,究竟发生了什么?
  3. java实用教程——组件及事件处理——布局的一个小实例
  4. 算法设计与分析———动态规划———最大子段和
  5. Mysql中group by 使用中发现的问题
  6. Java容器的遍历之增强for循环
  7. [计组]压缩BCD码指二进制编码的十进制
  8. [JavaWeb-Servlet]概述与快速入门
  9. 《C++ Primer》7.3.2节练习
  10. ARC068C - Snuke Line