键值对聚合操作(reduceByKey,foldByKey,sortByKey, join)

  • 1. reduceByKey
    • scala版本
    • java版本
  • 2. foldByKey
    • scala版本
  • 3. SortByKey
    • scala版本
    • java版本
  • 4. groupByKey
    • scala版本
    • java版本
  • 5. cogroup
    • scala版本
    • java版本
  • 6. subtractByKey
  • 7. join,fullOuterJoin, rightOuterJoin, leftOuterJoin
    • scala版本
    • java版本

1. reduceByKey

def reduceByKey(func: (V, V) => V): RDD[(K, V)]def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

接收一个函数,按照相同的key进行reduce操作,类似于scala的reduce的操作
例如RDD {(1, 2), (3, 4), (3, 6)}进行reduce

scala版本

 var mapRDD = sc.parallelize(List((1,2),(3,4),(3,6)))var reduceRDD = mapRDD.reduceByKey((x,y)=>x+y)reduceRDD.foreach(x=>println(x))
------输出---------
(1,2)
(3,10)

再举例
单词计数
F:\sparktest\sample.txt中的内容如下

aa bb cc aa aa aa dd dd ee ee ee ee
ff aa bb zks
ee kks
ee  zz zks

scala版本

   val lines = sc.textFile("F:\\sparktest\\sample.txt")val wordsRDD = lines.flatMap(x=>x.split(" ")).map(x=>(x,1))val wordCountRDD = wordsRDD.reduceByKey((x,y)=>x+y)wordCountRDD.foreach(x=>println(x))
---------输出-----------
(ee,6)
(aa,5)
(dd,2)
(zz,1)
(zks,2)
(kks,1)
(ff,1)
(bb,2)
(cc,1)

java版本

public class reduceByKeyRDDJava {public static void main(String[] args) {SparkConf conf = new SparkConf().setMaster("local").setAppName("reduceByKeyJava");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> words = sc.textFile("in/word.txt");JavaPairRDD<String, Integer> wordsPairRDD = words.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {@Overridepublic Iterator<Tuple2<String, Integer>> call(String s) throws Exception {ArrayList<Tuple2<String, Integer>> tpLists = new ArrayList<>();String[] split = s.split(" ");for (String str :split) {Tuple2<String, Integer> tup2 = new Tuple2<>(str, 1);tpLists.add(tup2);}return tpLists.iterator();}});JavaPairRDD<String, Integer> wordCount = wordsPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}});Map<String, Integer> collectAsMap = wordCount.collectAsMap();for (String key :collectAsMap.keySet()) {System.out.println("("+key+","+collectAsMap.get(key)+")");}}
}

2. foldByKey

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

该函数用于RDD[K,V]根据K将V做折叠、合并处理,其中的参数zeroValue表示先根据映射函数将zeroValue应用于V,进行初始化V,再将映射函数应用于初始化后的V.
与reduce不同的是 foldByKey开始折叠的第一个元素不是集合中的第一个元素,而是传入的一个元素

scala版本

object foldByKeyRDDScala {def main(args: Array[String]): Unit = {val conf= new SparkConf().setAppName("foldByKeyScala").setMaster("local")val sc= new SparkContext(conf)val rdd1: RDD[(String, Int)]=sc.parallelize(List(("A", 2), ("A", 3), ("B", 5), ("B", 8)))rdd1.foldByKey(0)((x,y)=>{println("one:"+x+"two:"+y);x+y}).collect.foreach(println)}
}

3. SortByKey

 def sortByKey(ascending : scala.Boolean = { /* compiled code */ }, numPartitions : scala.Int = { /* compiled code */ }) : org.apache.spark.rdd.RDD[scala.Tuple2[K, V]] = { /* compiled code */ }

SortByKey用于对pairRDD按照key进行排序,第一个参数可以设置true或者false,默认是true

scala版本

scala> val rdd = sc.parallelize(Array((3, 4),(1, 2),(4,4),(2,5), (6,5), (5, 6)))  // sortByKey不是Action操作,只能算是转换操作
scala> rdd.sortByKey()
res9: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[28] at sortByKey at <console>:24 //看看sortByKey后是什么类型
scala> rdd.sortByKey().collect()
res10: Array[(Int, Int)] = Array((1,2), (2,5), (3,4), (4,4), (5,6), (6,5)) //降序排序
scala> rdd.sortByKey(false).collect()
res12: Array[(Int, Int)] = Array((6,5), (5,6), (4,4), (3,4), (2,5), (1,2))

java版本

public class sortByKeyRDDJava {public static void main(String[] args) {SparkConf conf = new SparkConf().setAppName("sortByKeyJava").setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);ArrayList<Tuple2<Integer,String>> list = new ArrayList<>();list.add(new Tuple2<>(5,"hello"));list.add(new Tuple2<>(4,"world"));list.add(new Tuple2<>(3,"spark"));list.add(new Tuple2<>(2,"scala"));list.add(new Tuple2<>(1,"china"));JavaRDD<Tuple2<Integer, String>> rdd1 = sc.parallelize(list);PairFunction<Tuple2<Integer, String>, Integer, String> pairFunction = new PairFunction<Tuple2<Integer, String>, Integer, String>() {@Overridepublic Tuple2<Integer, String> call(Tuple2<Integer, String> tup2) throws Exception {System.out.println("PairFunction" + tup2._1 + " " + tup2._2);return tup2;}};JavaPairRDD<Integer, String> integerStringJavaPairRDD = rdd1.mapToPair(pairFunction);JavaPairRDD<Integer, String> integerStringJavaPairRDD1 = integerStringJavaPairRDD.sortByKey(true);List<Tuple2<Integer, String>> collect = integerStringJavaPairRDD1.collect();for (Tuple2 tp2 :collect) {System.out.println(tp2);}}
}

4. groupByKey

def groupByKey(): RDD[(K, Iterable[V])]def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

groupByKey会将RDD[key,value] 按照相同的key进行分组,形成RDD[key,Iterable[value]]的形式, 有点类似于sql中的groupby,例如类似于mysql中的group_concat
例如这个例子, 我们对学生的成绩进行分组

scala版本

object groupByKeyScala {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("groupByKeyScala").setMaster("local")val sc = new SparkContext(conf)val scoreDetail = sc.parallelize(List(("xiaohei", 67),("xiaohei", 98),("xiaomin", 45),("xiaomin", 69),("xiaozi", 75),("xiaozi", 64),("xiaolan", 85),("xiaolan", 96)))scoreDetail.groupByKey().collect.foreach(println)}
}

java版本

public class groupByKeyJava {public static void main(String[] args) {SparkConf conf = new SparkConf().setMaster("local").setAppName("groupByKeyJava");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<Tuple2<String,Integer>> scoreDetail = sc.parallelize(Arrays.asList(new Tuple2("xiaohei", 67),new Tuple2("xiaohei", 98),new Tuple2("xiaomin", 45),new Tuple2("xiaomin", 69),new Tuple2("xiaozi", 75),new Tuple2("xiaozi", 64),new Tuple2("xiaolan", 85),new Tuple2("xiaolan", 96)));JavaPairRDD<String,Integer> scoreMapRDD = JavaPairRDD.fromJavaRDD(scoreDetail);Map<String, Iterable<Integer>> resultMap = scoreMapRDD.groupByKey().collectAsMap();for (String key :resultMap.keySet()) {System.out.println("("+key+","+resultMap.get(key)+")");}}
}

5. cogroup

groupByKey是对单个 RDD 的数据进行分组,还可以使用一个叫作 cogroup() 的函数对多个共享同一个键的 RDD 进行分组
例如
RDD1.cogroup(RDD2) 会将RDD1和RDD2按照相同的key进行分组,得到(key,RDD[key,Iterable[value1],Iterable[value2]])的形式
cogroup也可以多个进行分组
例如RDD1.cogroup(RDD2,RDD3,…RDDN), 可以得到(key,Iterable[value1],Iterable[value2],Iterable[value3],…,Iterable[valueN])
案例,scoreDetail存放的是学生的优秀学科的分数,scoreDetai2存放的是刚刚及格的分数,scoreDetai3存放的是没有及格的科目的分数,我们要对每一个学生的优秀学科,刚及格和不及格的分数给分组统计出来

scala版本

object cogroupRDDScala {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("cogroupScala").setMaster("local")val sc = new SparkContext(conf)val score1 = sc.parallelize(List(("xiaomin",97),("xiaomin",90),("lihua",98),("lihua",76)))val score2 = sc.parallelize(List(("xiaomin",34),("lihua",56),("lihua",73),("xiaofeng",86)))val score3 = sc.parallelize(List(("xiaofeng",78),("lihua",90),("lihua",73),("xiaofeng",86)))score1.cogroup(score2,score3).foreach(println)}
}

java版本

public class cogroupRDDJava {public static void main(String[] args) {SparkConf conf = new SparkConf().setAppName("cogroupJava").setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<Tuple2<String,Integer>> score1 = sc.parallelize(Arrays.asList(new Tuple2("xiaoming", 98),new Tuple2("xiaoming", 93),new Tuple2("xiaobai", 88),new Tuple2("xiaobai", 58),new Tuple2("xiaolan", 58),new Tuple2("xiaolan", 68)));JavaRDD<Tuple2<String,Integer>> score2 = sc.parallelize(Arrays.asList(new Tuple2("xiaolan", 56),new Tuple2("xiaolan", 67),new Tuple2("xiaoming", 58),new Tuple2("xiaoming", 38),new Tuple2("xiaobai", 78),new Tuple2("xiaobai", 48)));JavaRDD<Tuple2<String,Integer>> score3 = sc.parallelize(Arrays.asList(new Tuple2("xiaozi", 56),new Tuple2("xiaozi", 67),new Tuple2("xiaoming", 18),new Tuple2("xiaoming", 28),new Tuple2("xiaobai", 58),new Tuple2("xiaobai", 28)));JavaPairRDD<String,Integer > scoreMapRDD1 = JavaPairRDD.fromJavaRDD(score1);JavaPairRDD<String,Integer > scoreMapRDD2 = JavaPairRDD.fromJavaRDD(score2);JavaPairRDD<String,Integer > scoreMapRDD3 = JavaPairRDD.fromJavaRDD(score3);JavaPairRDD<String, Tuple3<Iterable<Integer>, Iterable<Integer>, Iterable<Integer>>> cogroupRDD= (JavaPairRDD<String, Tuple3<Iterable<Integer>, Iterable<Integer>, Iterable<Integer>>>)scoreMapRDD1.cogroup(scoreMapRDD2, scoreMapRDD3);Map<String, Tuple3<Iterable<Integer>, Iterable<Integer>, Iterable<Integer>>> Tuple3Map = cogroupRDD.collectAsMap();for (String key :Tuple3Map.keySet()) {System.out.println("("+key+","+Tuple3Map.get(key)+")");}}
}

6. subtractByKey

函数定义

def subtractByKey[W](other: RDD[(K, W)])(implicit arg0: ClassTag[W]): RDD[(K, V)]def subtractByKey[W](other: RDD[(K, W)], numPartitions: Int)(implicit arg0: ClassTag[W]): RDD[(K, V)]def subtractByKey[W](other: RDD[(K, W)], p: Partitioner)(implicit arg0: ClassTag[W]): RDD[(K, V)]

类似于subtrac,删掉 RDD 中键与 other RDD 中的键相同的元素

7. join,fullOuterJoin, rightOuterJoin, leftOuterJoin

scala版本

object KVguanjian {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local").setAppName("KVguanjian")val sc = new SparkContext(conf)val rdd = sc.makeRDD(Array((1,2),(3,4),(5,6)))val other = sc.makeRDD(Array((3,9)))//rdd.subtractByKey(other).collect.foreach(println)//rdd.join(other).collect.foreach(println)//rdd.leftOuterJoin(other).collect.foreach(println)rdd.rightOuterJoin(other).collect.foreach(println)}
}

java版本

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Map;public class KVguanjianJava {public static void main(String[] args) {SparkConf conf = new SparkConf().setMaster("local").setAppName("KVguanjianJava");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<Tuple2<Integer,Integer>> rddPre = sc.parallelize(Arrays.asList(new Tuple2(1, 2),new Tuple2(3, 4),new Tuple2(5, 6)));JavaRDD<Tuple2<Integer,Integer>> otherPre = sc.parallelize(Arrays.asList(new Tuple2(3, 10),new Tuple2(4, 8)));//JavaRDD转换成JavaPairRDDJavaPairRDD<Integer, Integer> rdd = JavaPairRDD.fromJavaRDD(rddPre);JavaPairRDD<Integer, Integer> other = JavaPairRDD.fromJavaRDD(otherPre);JavaPairRDD<Integer, Integer> subtractRDD = rdd.subtractByKey(other);JavaPairRDD<Integer, Tuple2<Integer, Integer>> joinRDD = (JavaPairRDD<Integer, Tuple2<Integer, Integer>>) rdd.join(other);JavaPairRDD<Integer, Tuple2<Optional<Integer>, Optional<Integer>>> fullOutJoinRDD = (JavaPairRDD<Integer, Tuple2<Optional<Integer>, Optional<Integer>>>) rdd.fullOuterJoin(other);JavaPairRDD<Integer, Tuple2<Integer, Optional<Integer>>> leftOutJoinRDD = (JavaPairRDD<Integer, Tuple2<Integer, Optional<Integer>>>) rdd.leftOuterJoin(other);JavaPairRDD<Integer, Tuple2<Optional<Integer>, Integer>> rightOuterJoinRDD = (JavaPairRDD<Integer, Tuple2<Optional<Integer>, Integer>>) rdd.rightOuterJoin(other);Map<Integer, Integer> subMap = subtractRDD.collectAsMap();System.out.println("----------subRDD");for (Integer key :subMap.keySet()) {System.out.println("subRDD:"+key+","+subMap.get(key));}Map<Integer, Tuple2<Integer, Integer>> joinMap = joinRDD.collectAsMap();System.out.println("----------joinMap");for (Integer key :joinMap.keySet()) {System.out.println("joinMap:"+key+","+joinMap.get(key));}Map<Integer, Tuple2<Optional<Integer>, Optional<Integer>>> fullOutJoinMap = fullOutJoinRDD.collectAsMap();System.out.println("----------fullOutJoinMap");for (Integer key :fullOutJoinMap.keySet()) {System.out.println("fullOutJoinMap:"+key+","+fullOutJoinMap.get(key));}Map<Integer, Tuple2<Integer, Optional<Integer>>> leftOutJoinMap = leftOutJoinRDD.collectAsMap();System.out.println("----------leftOutJoinMap");for (Integer key :leftOutJoinMap.keySet()) {System.out.println("leftOutJoinMap:"+key+","+leftOutJoinMap.get(key));}Map<Integer, Tuple2<Optional<Integer>, Integer>> rightOuterJoinMap = rightOuterJoinRDD.collectAsMap();System.out.println("----------rightOutJoinMap");for (Integer key:rightOuterJoinMap.keySet()) {System.out.println("rightOuterJoinRDD:"+key+","+rightOuterJoinMap.get(key));}}
}

SparkRDD算子(三)键值对聚合操作(reduceByKey,foldByKey,sortByKey, join)相关推荐

  1. Spark 键值对RDD操作

    https://www.cnblogs.com/yongjian/p/6425772.html 概述 键值对RDD是Spark操作中最常用的RDD,它是很多程序的构成要素,因为他们提供了并行操作各个键 ...

  2. 【Redis】Redis 哈希 Hash 键值对集合操作 ( 哈希 Hash 键值对集合简介 | 查询操作 | 增加操作 | 修改操作 )

    文章目录 一.哈希 Hash 键值对集合 二.查询操作 1.Redis 中查询 Hash 键值对数据 2.查询 Hash 键是否存在 3.查询 Hash 中所有的键 Field 4.查询 Hash 中 ...

  3. php中数组的指针函数参数传递参数,循环语句、函数的参数及作用域、数组键值及指针操作函数(8月23日作业)...

    实例演示while(),do~while() 实例 /** * while循环 */ $num = 1; $sum = 0; while ($num <= 100) { $sum +=$num; ...

  4. vue如何获取数组中的键值_vue中操作数组的相关方法

    1,锁定数组的长度(只读模式)[ Array.join() ] 2.将数组合并成字符串(返回字符串)[ Array.join() ] 3.返回逆序数组(倒叙排列数组)[ Array..reverse( ...

  5. spark编程基础--5.2键值对RDD

    键值对RDD的创建 常用的键值对转换操作 reduceByKey(func) groupByKey() keys values sortByKey() mapValues(func) join com ...

  6. Flink 状态管理:算子状态、键值分区状态、状态后端、有状态算子的扩缩容

    文章目录 状态管理 算子状态 键值分区状态 状态后端(State Backends) 有状态算子的扩缩容 状态管理 通常意义上,函数里所有需要任务去维护并用来计算结果的数据都属于任务的状态,可以把状态 ...

  7. 字典删除多个键值对方法_Life is short,you need Python——Python序列(元组、字典、集合)...

    一.元组 tuple 列表属于可变序列,可以任意修改列表中的元素. 元组属于不可变序列,不能修改元组中的元素.因此,元组没有增加元素.修改元素.删除元素相关的方法. 下面只介绍元组的创建和删除,元组中 ...

  8. 注册表REG文件编写实例(创建、删除、添加、更改键值)

    转载自:http://www.newxing.com/Tech/Soft/system/84.html Windows 中的注册表文件( system.dat 和 user.dat )是 Window ...

  9. JDK1.8聚合操作

    在java8 JDK包含许多聚合操作(如平均值,总和,最小,最大,和计数),返回一个计算流stream的聚合结果.这些聚合操作被称为聚合操作.JDK除返回单个值的聚合操作外,还有很多聚合操作返回一个c ...

  10. C/C++注册表【4】键值的获取,设置,删除,枚举

    C/C++注册表[4]键值的获取,设置,删除,枚举 1.键值的获取: LONG WINAPI RegQueryValueEx(HKEY hKey, //一个已打开项的句柄,或者指定一个标准项名LPCT ...

最新文章

  1. RTX2013和微信企业号打通
  2. mysql有类似dbms_output.pu_line();_使用MySQL,SQL_MODE有哪些坑,你知道么?
  3. net 控制台 定时_.NET Core实现基于Quart.Net的任务管理
  4. java中main函数解析
  5. python新闻评论分析_从新闻文章中提取评论
  6. 技术员联盟win11旗舰版64位镜像v2021.07
  7. 20154319 实验九web安全基础实践
  8. 蒟蒻的HNOI2017滚粗记
  9. 360与Bing合作上线英文搜索
  10. oracle 10g下载百度云地址
  11. strcmp函数的实现
  12. JPEG 图像压缩原理
  13. Python 常用写法
  14. 【Python学习】(9)[Errno 2]No such file or directory:'calibri.ttf'
  15. 机器学习算法——手动搭建决策树分类器(代码+作图)
  16. 20多个可以提高你安卓开发技能的开源app
  17. sap月结问题之-ckmlpp物料帐期问题。
  18. 华为手机备份的通讯录是什么文件_华为手机资料备份与恢复教程(含联系人短信图片程序等)...
  19. android美柚日历控件,仿美柚大姨妈日历
  20. 【飞控开发基础教程9】疯壳·开源编队无人机-PWM(电机控制)

热门文章

  1. 所谓的成长就是认知升级-成长就是应付自如
  2. Day15——Huffman编码之构建Huffman树
  3. Java IDE漫谈(一)
  4. 阿里云域名注册+服务器购买+备案教程
  5. 上海电信路由器有ipv6,电脑无法获取ipv6问题记录
  6. mysql中ddl是什么_mysql ddl什么意思
  7. Unity使用脚本动态修改材质球的颜色
  8. svchost.exe程序下载解决方法或者在360中看到svchost.exe占网速
  9. python django 基本测试 及调试 201812
  10. 宋江是怎么当上老大的