一、分组求平均值

计算文本里面的每个key分组求平均值,输出结果。

二、maven设置

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.mk</groupId><artifactId>spark-test</artifactId><version>1.0</version><name>spark-test</name><url>http://spark.mk.com</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><scala.version>2.11.1</scala.version><spark.version>2.4.4</spark.version><hadoop.version>2.6.0</hadoop.version></properties><dependencies><!-- scala依赖--><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><!-- spark依赖--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency></dependencies><build><pluginManagement><plugins><plugin><artifactId>maven-clean-plugin</artifactId><version>3.1.0</version></plugin><plugin><artifactId>maven-resources-plugin</artifactId><version>3.0.2</version></plugin><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version></plugin><plugin><artifactId>maven-surefire-plugin</artifactId><version>2.22.1</version></plugin><plugin><artifactId>maven-jar-plugin</artifactId><version>3.0.2</version></plugin></plugins></pluginManagement></build>
</project>

三、编程代码

public class GroupByAvgApp implements SparkConfInfo {public static void main(String[] args) {String filePath = "E:\\spark\\groubByNumber.txt";SparkSession sparkSession = new GroupByAvgApp().getSparkConf("groubByNumber");JavaPairRDD<String, Integer> numbers = sparkSession.sparkContext().textFile(filePath, 4).toJavaRDD().flatMap(v -> Arrays.asList(v.split("\n")).iterator()).mapToPair(v -> {String[] data = v.split("\\s+");if (data.length != 2) {return null;}if (!data[1].matches("-?[0-9]+(.[0-9]+)?"))return null;return new Tuple2<>(data[0], Integer.valueOf(data[1]));}).filter(v -> v != null).cache();//数据量大会溢出内存无法计算
//        numbers.groupByKey()
//                .sortByKey(true)
//                .mapValues(v -> {
//                    double sum = 0;
//                    double count = 0;
//                    Iterator<Integer> it = v.iterator();
//                    while (it.hasNext()) {
//                        sum += it.next();
//                        count++;
//                    }
//                    double avg = sum / count;
//                    return avg;
//                })
//                .collect()
//                .forEach(v -> System.out.println(v._1 + ":" + v._2));//这种聚合数据再计算numbers.combineByKey(val -> new Tuple2<>(val, 1),  // 将val映射为一个元组,作为分区内聚合初始值(t, val) -> new Tuple2<>(t._1() + val, t._2() + 1), //分区内聚合,(a, b) -> new Tuple2<>(a._1() + b._1(), a._2() + b._2()))   //分区间聚合.mapToPair(keyValPair -> {double avg = keyValPair._2()._2() == 0 ? 0 : keyValPair._2()._1() * 1.0 / keyValPair._2()._2();return new Tuple2<>(keyValPair._1(), avg);}).sortByKey(true).collect().forEach(v -> System.out.println(v._1 + ":" + v._2));sparkSession.stop();}
}public interface SparkConfInfo {default SparkSession getSparkConf(String appName){SparkConf sparkConf = new SparkConf();if(System.getProperty("os.name").toLowerCase().contains("win")) {sparkConf.setMaster("local[4]");System.out.println("使用本地模拟是spark");}else{sparkConf.setMaster("spark://hadoop01:7077,hadoop02:7077,hadoop03:7077");sparkConf.set("spark.driver.host","192.168.150.1");//本地ip,必须与spark集群能够相互访问,如:同一个局域网sparkConf.setJars(new String[] {".\\out\\artifacts\\spark_test\\spark-test.jar"});//项目构建生成的路径}SparkSession session = SparkSession.builder().appName(appName).config(sparkConf).config(sparkConf).getOrCreate();return session;}
}

groubByNumber.txt文件内容

A 100
A 24
B 43
C 774
D 43
D 37
D 78
E 42
C 68
F 89
G 49
F 543
H 36
E 888
A 258
A 538
B 79
B 6
H 67
C 99

输出

A:230.0
B:42.666666666666664
C:313.6666666666667
D:52.666666666666664
E:465.0
F:316.0
G:49.0
H:51.5

四、combineByKey方法

<C> JavaPairRDD<K, C> combineByKey(Function<V, C> createCombiner, Function2<C, V, C> mergeValue, Function2<C, C, C> mergeCombiners);

首先介绍一下上面三个参数:

* Users provide three functions:
*  - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
这个函数把当前的值作为参数,此时我们可以对其做些附加操作(类型转换)并把它返回 (这一步类似于初始化操作)
*  - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
该函数把元素V合并到之前的元素C(createCombiner)上 (这个操作在每个分区内进行)
*  - `mergeCombiners`, to combine two C's into a single one.
该函数把2个元素C合并 (这个操作在不同分区间进行)

Spark入门(十三)之分组求平均值相关推荐

  1. python筛选数据求均值_Python Pandas实现数据分组求平均值并填充nan的示例

    Python实现按某一列关键字分组,并计算各列的平均值,并用该值填充该分类该列的nan值. DataFrame数据格式 fillna方式实现 groupby方式实现 DataFrame数据格式 以下是 ...

  2. R语言分组求和,分组求平均值,分组计数

    我们经常可能需要把一个数据按照某一属性分组,然后计算一些统计值.在R语言里面,aggregate函数就可以办到. ## S3 method for class 'data.frame' aggrega ...

  3. pandas: groupby()分组求平均值

    两种方式都可以 data.groupby('Pclass')['Fare'].mean() data.groupby('Pclass').mean()['Fare']

  4. Spark入门(十六)之分组求TOP N最小值

    一.分组求TOP N最小值 计算文本里面的每个key分组求TOP N最小值,输出结果. 二.maven设置 <?xml version="1.0" encoding=&quo ...

  5. Spark入门(十五)之分组求最小值

    一.分组求最小值 计算文本里面的每个key分组求最小值,输出结果. 二.maven设置 <?xml version="1.0" encoding="UTF-8&qu ...

  6. Spark入门(十四)之分组求最大值

    一.分组求最大值 计算文本里面的每个key分组求最大值,输出结果. 二.maven设置 <?xml version="1.0" encoding="UTF-8&qu ...

  7. C# Linq to Entity Lamda方式分组并求和求平均值

    1.单字段分组并求和: var list = data.GroupBy(g => g.GoodsId).Select(e => new { GoodsId = e.Key, Qty = e ...

  8. 新手向,从用Spark求平均值到reduceByKey详解

    1.前因后果 在网上看到了一种用Spark求平均值的算法,自己写了下,修改了一些错误,我这是能直接run起来的版本.我会在本文中对这段代码进行详细的讲解,以加强对reduceByKey用法的印象.耐心 ...

  9. oracle中更新一列分组的均值,oracle 分组平均后又求平均值的方法

    oracle 分组平均后再求平均值的方法 请教大家一个问题,是分组平均后再平均的方法 有一张表,记录物品的价格 名称   销售日期    价格 A     1日          2 A     2日 ...

最新文章

  1. WinForm容器内控件批量效验是否允许为空?设置是否只读?设置是否可用等方法分享...
  2. 职场笔记[0702]:用户体验和创业
  3. 山东省102021年普通高考成绩查询,山东高考成绩今日发布!成绩查询看这里!
  4. phpexcel导出后乱码或者是打不开文件必须修复的问题
  5. 【Flink】 Flink与Kafka版本对应关系
  6. JZOJ 3426. 封印一击
  7. Flutter实战一Flutter聊天应用(九)
  8. jsp水果商城系统毕业设计网站成品论文
  9. bert性能优化之——用另一种方式整合多头注意力
  10. JPDA 架构研究19 - JDI的连接模块
  11. H5+springboot(集成ffmpeg)实现前端视频录制以及webm格式转mp4
  12. H5跳转到移动端地图网页打开地图App、Apple地图、百度、腾讯、高德地图导航
  13. 最流行的统计假设检验速查表
  14. SSD的priorbox层
  15. 微信PC端网站 微信扫码登陆
  16. peda 官方帮助文档
  17. 生物统计学(biostatistics)学习笔记(五)卡方检验
  18. oracle 无metalink账号补丁下载方法
  19. Tests run: 0, Failures: 0, Errors: 0, Skipped: 0
  20. doc和docx的区别

热门文章

  1. 135. 分发糖果002(贪心算法+思路+详解)
  2. 7-37 模拟EXCEL排序 (25 分)(思路+详解+超时解决 兄弟们冲呀呀呀呀呀呀)
  3. ES6的Set和Map你都知道吗?一文了解集合和字典在前端中的应用
  4. 蓝桥杯真题-连号区间数-枚举
  5. python3.7和3.5_Ubuntu更新python3.5到python3.7
  6. php怎么上传函数,PHP单文件上传原理及上传函数的封装操作示例
  7. 2019-02-26-算法-进化(回文数)
  8. 不要666升级版(数位DP,三次方和)
  9. #3328. PYXFIB(单位根反演)
  10. CF1497C k-LCM