Spark支持两种RDD操作:transformation和action 
在本文中,将对几个常用的transformation算子进行案例演示,采用Java和Scala两种语言对代码进行编写 
其中,在Java版本中,将对transformation算子进行详细介绍

transformation常用算子介绍

 

Java版本

@SuppressWarnings(value = {"unused"})
public class TransformationOperation {public static void main(String[] args) {
//      map();
//      filter();
//      flatMap();
//      groupByKey();
//      reduceByKey();
//      sortByKey();
//      join();cogroup();}/*** map算子案例:将集合中每一个元素都乘以2*/private static void map(){// 创建SparkConfSparkConf conf = new SparkConf().setAppName("map").setMaster("local");// 创建JavaSparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 构造集合List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);// 并行化集合,创建初始RDDJavaRDD<Integer> numberRDD = sc.parallelize(numbers);// 使用map算子,将集合中的每个元素都乘以2// map算子,是对任何类型的RDD,都可以调用的// 在Java中,map算子接收的参数是Function对象// 创建的Function对象,一定会让你设置第二个泛型参数,这个泛型类型,就是返回的新元素的类型// 同时call()方法的返回类型,也必须与第二个泛型类型同步// 在call()方法内部,就可以对原始RDD中的每一个元素进行各种处理和计算,并返回一个新的元素// 所有新的元素就会组成一个新的RDDJavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer, Integer>() {private static final long serialVersionUID = 1L;// 传入call()方法的,就是1,2,3,4,5// 返回的就是2,4,6,8,10@Overridepublic Integer call(Integer v1) throws Exception {return v1 * 2;}});// 打印新的RDDmultipleNumberRDD.foreach(new VoidFunction<Integer>() {private static final long serialVersionUID = 1L;@Overridepublic void call(Integer t) throws Exception {System.out.println(t);}});// 关闭JavaSparkContextsc.close();}/*** filter算子案例:过滤集合中的偶数*/private static void filter(){// 创建SparkConfSparkConf conf = new SparkConf().setAppName("fliter").setMaster("local");// 创建JavaSparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 模拟集合List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);// 并行化集合,创建初始RDDJavaRDD<Integer> numberRDD = sc.parallelize(numbers);// 对初始RDD执行了filter算子,过滤出其中的偶数// filter算子,传入的也是Function,其他的使用注意点,实际上和map是一样的// 但是,唯一的不同,就是call()方法的返回类型是Boolean// 每一个初始RDD中的元素,都会传入call()方法,此时你可以执行各种自定义计算逻辑// 来判断这个元素是否是你想要的// 如果你想在新的RDD中保留这个元素,那么就返回true;否则,不想保留这个元素,返回falseJavaRDD<Integer> evenNumberRDD = numberRDD.filter(new Function<Integer, Boolean>() {private static final long serialVersionUID = 1L;// 在这里,1到10,都会传入进来// 但是根据我们的逻辑,只有2,4,6,8,10这几个偶数,会返回true// 所以,只有偶数会保留下来,放在新的RDD中@Overridepublic Boolean call(Integer v1) throws Exception {return v1 % 2 ==0;}});// 打印新的RDDevenNumberRDD.foreach(new VoidFunction<Integer>() {private static final long serialVersionUID = 1L;@Overridepublic void call(Integer t) throws Exception {System.out.println(t);}});// 关闭JavaSparkContextsc.close();}/*** flatMap案例:将文本行拆分为多个单词* 注:flatMap与map的区别*    map会针对每一条输入进行指定的操作,然后为每一条输入返回一个对象*    flatMap有两个操作:操作1==>同map函数一样,对每一条输入进行指定的操作,然后为每一条输入返回一个对象*                     操作2==>最后将所有对象合并未一个对象*/private static void flatMap(){// 创建SparkConfSparkConf conf = new SparkConf().setAppName("flatMap").setMaster("local");// 创建JavaSparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 构造集合List<String> lineList = Arrays.asList("hello you", "hello me", "hello world");// 并行化集合,创建RDDJavaRDD<String> lines = sc.parallelize(lineList);// 对RDD执行flatMap算子,将每一行文本,拆分为多个单词// flatMap算子,在Java中,接收的参数FlatMapFunction// 我们需要自己定义FlatMapFunction的第二个泛型类型,即,代表了返回的新元素的类型// call()方法,返回的类型,不是U,而是Iterable<U>,这里的U也与第二个泛型类型相同// flatMap其实就是,接收原始RDD中的每个元素,并进行各种逻辑的计算和处理,返回可以返回// 多个元素,即封装在Iterable集合中,可以使用ArrayList等集合// 新的RDD中,即封装了所有的新元素;也就是说,新的RDD的大小一定是 >= 原始RDD的大小JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {private static final long serialVersionUID = 1L;@Overridepublic Iterable<String> call(String t) throws Exception {return Arrays.asList(t.split(" "));}});// 打印新的RDDwords.foreach(new VoidFunction<String>() {private static final long serialVersionUID = 1L;@Overridepublic void call(String t) throws Exception {System.out.println(t);}});// 关闭JavaSparkContextsc.close();}/*** groupByKey案例:按照班级对成绩进行分组*/private static void groupByKey(){// 创建SparkConfSparkConf conf = new SparkConf().setAppName("groupByKey").setMaster("local");// 创建JavaSparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 模拟集合@SuppressWarnings("unchecked")List<Tuple2<String, Integer>> scoreList = Arrays.asList(new Tuple2<String, Integer>("class1", 80),new Tuple2<String, Integer>("class2", 75),new Tuple2<String, Integer>("class1", 90),new Tuple2<String, Integer>("class2", 65));// 并行化集合,创建JavaPairRDD// 注意:这里使用的是SparkContext的parallelizePairs()方法去创建JavaPairRDD// 与之前创建JavaRDD的方式有所不同JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);// 针对scores RDD,执行groupByKey算子,对每个班级的成绩进行分组// groupByKey算子,返回的还是JavaPairRDD// 但是,JavaPairRDD的第一个泛型类型不变,第二个泛型类型变成Iterable这种集合类型// 也就是说,按照了key进行分组,那么每个key可能都会有多个value,此时多个value聚合成了Iterable// 那么接下来,我们是不是就可以通过groupedScores这种JavaPairRDD,很方便地处理某个分组内的数据JavaPairRDD<String, Iterable<Integer>> groupedScores = scores.groupByKey();// 打印groupedScores RDDgroupedScores.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {private static final long serialVersionUID = 1L;@Overridepublic void call(Tuple2<String, Iterable<Integer>> t) throws Exception {System.out.println("class:" + t._1);Iterator<Integer> ite = t._2.iterator();while (ite.hasNext()) {System.out.println(ite.next());}System.out.println("====================");}});// 关闭JavaSparkContextsc.close();}/*** reduceByKey案例:统计每个班级的总分*/private static void reduceByKey(){// 创建SparkConfSparkConf conf = new SparkConf().setAppName("reduceByKey").setMaster("local");// 创建JavaSparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 模拟集合@SuppressWarnings("unchecked")List<Tuple2<String, Integer>> scoreList = Arrays.asList(new Tuple2<String, Integer>("class1", 80),new Tuple2<String, Integer>("class2", 75),new Tuple2<String, Integer>("class1", 90),new Tuple2<String, Integer>("class2", 65));// 并行化集合,创建JavaPairRDDJavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);// 针对scores RDD,执行reduceByKey算子// reduceByKey,接收的参数是Function2类型,它有三个泛型参数,实际上代表了三个值// 第一个泛型类型和第二个泛型类型,代表了原始RDD中的元素的value的类型// 因此对每个key进行reduce,都会依次将第一个、第二个value传入,算出一个值之后,再传入第三个value// 因此此处,会自动定义两个泛型类型,代表call()方法的两个传入参数的类型// 第三个泛型类型,代表了每次reduce操作返回值的类型,默认也是与原始RDD的value类型相同的// reduceByKey算法返回的RDD,还是JavaPairRDD<key, value>JavaPairRDD<String, Integer> totalScores = scores.reduceByKey(new Function2<Integer, Integer, Integer>() {private static final long serialVersionUID = 1L;// 对每个key,都会将其value,依次传入call方法// 从而聚合出每个key对应的一个value// 然后,将每个key对应的一个value,组合成一个Tuple2。作为RDD的新元素@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}});// 打印totalScores RDDtotalScores.foreach(new VoidFunction<Tuple2<String,Integer>>() {private static final long serialVersionUID = 1L;@Overridepublic void call(Tuple2<String, Integer> t) throws Exception {System.out.println(t._1 + ":" + t._2);}});// 关闭JavaSparkContextsc.close();}/*** sortByKey操作:按照学生分数进行排序*/private static void sortByKey(){// 创建SparkConfSparkConf conf = new SparkConf().setAppName("sortByKey").setMaster("local");// 创建JavaSparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 模拟集合@SuppressWarnings("unchecked")List<Tuple2<Integer, String>> scoreList = Arrays.asList(new Tuple2<Integer, String>(65, "leo"),new Tuple2<Integer, String>(50, "tom"),new Tuple2<Integer, String>(100, "marry"),new Tuple2<Integer, String>(80, "jack"));// 并行化集合,创建RDDJavaPairRDD<Integer, String> scores = sc.parallelizePairs(scoreList);// 对scores RDD执行sortByKey算子// sortByKey其实就是根据key进行排序,可以手动指定升序或者降序// false表示从大到小排列;true表示从小到大排列// 返回的,还是JavaPairRDD,其中的元素内容,都是和原始的RDD一模一样的// 但是就是RDD中的元素的顺序,不同了JavaPairRDD<Integer, String> sortedScores = scores.sortByKey(false);// 打印sortedScored RDDsortedScores.foreach(new VoidFunction<Tuple2<Integer,String>>() {private static final long serialVersionUID = 1L;@Overridepublic void call(Tuple2<Integer, String> t) throws Exception {System.out.println(t._1 + ": " + t._2);}});// 关闭JavaSparkContextsc.close();}/*** join案例:打印学生成绩*/private static void join(){// 创建SparkConfSparkConf conf = new SparkConf().setAppName("join").setMaster("local");// 创建JavaSparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 模拟集合@SuppressWarnings("unchecked")List<Tuple2<Integer, String>> studentList = Arrays.asList(new Tuple2<Integer, String>(1, "leo"),new Tuple2<Integer, String>(2, "jack"),new Tuple2<Integer, String>(3, "tom"));@SuppressWarnings("unchecked")List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(new Tuple2<Integer, Integer>(1, 100),new Tuple2<Integer, Integer>(2, 90),new Tuple2<Integer, Integer>(3, 60));// 并行化两个RDDJavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);// 使用join算子关联两个RDD// join以后,还是会根据key进行join,并返回JavaPairRDD// 但是JavaPairRDD的第一个泛型类型,是之前两个JavaPairRDD都有的key类型,因为是该算子是通过key进行join的// 第二个泛型类型,是Tuple2<v1, v2>的类型,Tuple2的两个泛型分别为两个原始RDD的value的类型// join,就返回RDD的每一个元素,就是通过key join上的一个pair// 比如有(1,1)  (1,2)  (1,3)的一个RDD// 还有一个(1,4)  (2,1)  (2,2)的一个RDD// join以后,实际上会得到(1 (1,4))  (1 (2,4))  (1 (3,4))// 不会产生(2 (x,x))这样形式的,因为两个RDD中,通过join算子,只会join相同的key的RDD;// 而这两个RDD中,只有1这个key值是上下都有的,因此join之后,产生了上述的结果JavaPairRDD<Integer, Tuple2<String, Integer>> studentScores = students.join(scores);// 打印studentScores RDDstudentScores.foreach(new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>() {private static final long serialVersionUID = 1L;@Overridepublic void call(Tuple2<Integer, Tuple2<String, Integer>> t) throws Exception {System.out.println("student id: " + t._1);System.out.println("student name: " + t._2._1);System.out.println("student score: " + t._2._2);System.out.println("=========================");}});// 关闭JavaSparkContsc.close();}/*** cogroup案例:打印学生成绩*/private static void cogroup(){// 创建SparkConfSparkConf conf = new SparkConf().setAppName("cogroup").setMaster("local");// 创建JavaSparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 模拟集合@SuppressWarnings("unchecked")List<Tuple2<Integer, String>> studentList = Arrays.asList(new Tuple2<Integer, String>(1, "leo"),new Tuple2<Integer, String>(2, "jack"),new Tuple2<Integer, String>(3, "tom"));@SuppressWarnings("unchecked")List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(new Tuple2<Integer, Integer>(1, 100),new Tuple2<Integer, Integer>(2, 90),new Tuple2<Integer, Integer>(3, 60),new Tuple2<Integer, Integer>(1, 70),new Tuple2<Integer, Integer>(2, 80),new Tuple2<Integer, Integer>(1, 70),new Tuple2<Integer, Integer>(3, 50));// 并行化两个RDDJavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);// cogroup与join不用// 相当于是,一个key join上的所有value,都给放到一个Itreable里面去了// 而在join算子中,则不会Iterable里面,会一组一组的打印在控制台上JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> studentScores =students.cogroup(scores);// 打印studentScores RDDstudentScores.foreach(new VoidFunction<Tuple2<Integer,Tuple2<Iterable<String>,Iterable<Integer>>>>() {private static final long serialVersionUID = 1L;@Overridepublic void call(Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t) throws Exception {System.out.println("student id: " + t._1);System.out.println("student name: " + t._2._1);System.out.println("student score: " + t._2._2);System.out.println("=========================");}});// 关闭JavaSparkContextsc.close();}
}

Scala版本

object TransformationOperation {def main(args: Array[String]): Unit = {
//    map()
//    filter()
//    flatMap()
//    groupByKey()
//    reduceByKey()
//    sortByKey()
//    join()cogroup()}def map(): Unit = {val conf = new SparkConf().setAppName("map").setMaster("local")val sc = new SparkContext(conf)val numbers = Array(1, 2, 3, 4, 5)val numberRDD = sc.parallelize(numbers, 1)val multipleNumberRDD = numberRDD.map { num => num*2 }multipleNumberRDD.foreach { num => println(num) }}def filter(): Unit = {val conf = new SparkConf().setAppName("map").setMaster("local")val sc = new SparkContext(conf)val numbers = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)val numbersRDD = sc.parallelize(numbers, 1)val evenNumberRDD = numbersRDD.filter { num => num % 2 == 0 }evenNumberRDD.foreach { num => println(num) }}def flatMap(): Unit = {val conf = new SparkConf().setAppName("flatMap").setMaster("local")val sc = new SparkContext(conf)val lineArray = Array("hello you", "hello me", "hello world")val lines = sc.parallelize(lineArray, 1)val words = lines.flatMap { line => line.split(" ") }words.foreach { word => println(word) }}def groupByKey(): Unit = {val conf = new SparkConf().setAppName("groupByKey").setMaster("local")val sc = new SparkContext(conf)val scoreList = Array(Tuple2("class1",80),Tuple2("class2",75),Tuple2("class1",90),Tuple2("class2",60))val scores = sc.parallelize(scoreList, 1)val groupedScores = scores.groupByKey()groupedScores.foreach(score => {println(score._1)score._2.foreach { singleScore => println(singleScore)}println("===================")})}def reduceByKey(): Unit = {val conf = new SparkConf().setAppName("reduceByKey").setMaster("local")val sc = new SparkContext(conf)val scoreList = Array(Tuple2("class1",80),Tuple2("class2",75),Tuple2("class1",90),Tuple2("class2",60))val scores = sc.parallelize(scoreList, 1)val totalScores = scores.reduceByKey(_ + _)totalScores.foreach(classScore => println(classScore._1 + ":" + classScore._2))}def sortByKey(): Unit = {val conf = new SparkConf().setAppName("sortByKey").setMaster("local")val sc = new SparkContext(conf)val scoreList = Array(Tuple2(65,"leo"),Tuple2(50,"tom"),Tuple2(100,"marry"),Tuple2(85,"jack"))val scores = sc.parallelize(scoreList, 1)val sortedScores = scores.sortByKey(false)sortedScores.foreach(studentScore => println(studentScore._1 + ": " + studentScore._2))}def join(): Unit = {val conf = new SparkConf().setAppName("join").setMaster("local")val sc = new SparkContext(conf)val studentList = Array(Tuple2(1,"leo"),Tuple2(2,"jack"),Tuple2(3,"tom"))val scoreList = Array(Tuple2(1,100),Tuple2(2,90),Tuple2(3,60))val students = sc.parallelize(studentList, 1)val scores = sc.parallelize(scoreList, 1)val studentScores = students.join(scores)studentScores.foreach(studentScore => {println("student id:" + studentScore._1)println("student name:" + studentScore._2._1)println("student score:" + studentScore._2._2)println("=================================")})}def cogroup(): Unit = {val conf = new SparkConf().setAppName("cogroup").setMaster("local")val sc = new SparkContext(conf)val studentList = Array(Tuple2(1,"leo"),Tuple2(2,"jack"),Tuple2(3,"tom"))val scoreList = Array(Tuple2(1,100),Tuple2(2,90),Tuple2(3,60),Tuple2(1,70),Tuple2(2,80),Tuple2(1,70),Tuple2(3,50))val students = sc.parallelize(studentList, 1)val scores = sc.parallelize(scoreList, 1)val studentScores = students.cogroup(scores)studentScores.foreach(studentScore => {println("student id:" + studentScore._1)println("student name:" + studentScore._2._1)println("student score:" + studentScore._2._2)println("=================================")})}}

Spark transformation算子案例相关推荐

  1. Spark action算子案例

    在上篇文章中,我们对Spark中几种常用的transformation算子通过Java和Scala两种代码分别进行了案例演示,Spark transformation算子案例  而在本文中,我们将继续 ...

  2. Spark RDD算子(transformation + action)

    概念 RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变.可分区.里面的元素可并行计算的集合.RDD具有数据流模 ...

  3. Spark RDD使用详解3--Value型Transformation算子

    处理数据类型为Value型的Transformation算子可以根据RDD变换算子的输入分区与输出分区关系分为以下几种类型: 1)输入分区与输出分区一对一型  2)输入分区与输出分区多对一型  3)输 ...

  4. Spark学习之Spark RDD算子

    个人主页zicesun.com 这里,从源码的角度总结一下Spark RDD算子的用法. 单值型Transformation算子 map /*** Return a new RDD by applyi ...

  5. Spark _30_SparkStreaming算子操作Driver HA

    SparkStreaming算子操作 foreachRDD output operation算子,必须对抽取出来的RDD执行action类算子,代码才能执行. import org.apache.sp ...

  6. RDD之四:Value型Transformation算子

    处理数据类型为Value型的Transformation算子可以根据RDD变换算子的输入分区与输出分区关系分为以下几种类型: 1)输入分区与输出分区一对一型  2)输入分区与输出分区多对一型  3)输 ...

  7. Spark学习之路 (六)Spark Transformation和Action

    Transformation算子 基本的初始化 java static SparkConf conf = null;static JavaSparkContext sc = null;static { ...

  8. 大数据技术之_19_Spark学习_07_Spark 性能调优 + 数据倾斜调优 + 运行资源调优 + 程序开发调优 + Shuffle 调优 + GC 调优 + Spark 企业应用案例

    大数据技术之_19_Spark学习_07 第1章 Spark 性能优化 1.1 调优基本原则 1.1.1 基本概念和原则 1.1.2 性能监控方式 1.1.3 调优要点 1.2 数据倾斜优化 1.2. ...

  9. Spark综合小案例之莎士比亚诗文集词频统计

    教程目录 0x00 教程内容 0x01 数据准备 1. 数据获取 2. 数据内容 0x02 代码实现 1. 启动spark-shell 2. 测试代码 0x03 校验结果 1. 查看是否有统计结果 0 ...

最新文章

  1. 激光雷达与汽车技术路线
  2. 502 Server dropped connection
  3. Grid R-CNN解读:商汤最新目标检测算法,定位精度超越Faster R-CNN
  4. 【面经】字节跳动后端开发视频架构方向一面二面
  5. sort +awk+uniq 统计文件中出现次数最多的前10个单词yes3
  6. django版本区别/与版本匹配
  7. Windows核心思想-宽字符与窄字符(Unicode和ASCII)
  8. 最大最小背光亮度修改
  9. C语言快速学习笔记001-相关语法
  10. 【leetcode】416. Partition Equal Subset Sum
  11. 小学计算机纸牌教案,小学信息技术《玩好纸牌》教案
  12. 租用服务器怎么免去后顾之忧?
  13. Oracle 11g RAC添加一节点过程
  14. 【cuda】——npp/cuda图像预处理resize+norm对比
  15. Debian 10 开启和停止 ufw防火墙
  16. 计算机组装diy,电脑diy,详细教您如何组装电脑
  17. 怎么把pdf文件转换成word免费转换器
  18. 深恶痛绝的No mapping found for HTTP request with URI
  19. win7计算机里不显示摄像头,win7系统不显示摄像头的解决方法
  20. quickhit----快打小游戏

热门文章

  1. matlab hold off没用,matlab中 hold on 与hold off的用法
  2. java 字节码增强原理_深入浅出Java探针技术1--基于java agent的字节码增强案例
  3. arm rtx教程_ARM CMSIS标准概述及快速入门
  4. 嵌入式软件常见笔试面试题总结 .
  5. Java Math的 floor,round和ceil的总结 ,java基础知识
  6. java set iterator_Java中的TreeSet的iterator()方法 Java.util.TreeSet.iterator() - Break易站
  7. 编程实现基于二维易位置换机制进行信息加解密_基于TEE的TBOX安全技术
  8. html5 jquery paint plugin,制作高质量的JQuery Plugin 插件的方法
  9. 孤灯php加密,PHP实现观察者模式
  10. windows mysql 免安装_windows 免安装mysql