一、Transformation和Action

接下来我们详细分析一下Spark中对RDD的操作
Spark对RDD的操作可以整体分为两类:
Transformation和Action
这里的Transformation可以翻译为转换,表示是针对RDD中数据的转换操作,主要会针对已有的RDD创建一个新的RDD:常见的有map、flatMap、filter等等。

Action可以翻译为执行,表示是触发任务执行的操作,主要对RDD进行最后的操作,比如遍历、reduce、保存到文件等,并且还可以把结果返回给Driver程序

不管是Transformation里面的操作还是Action里面的操作,我们一般会把它们称之为算子,例如:map算子、reduce算子

其中Transformation算子有一个特性:lazy
lazy特性在这里指的是,如果一个spark任务中只定义了transformation算子,那么即使你执行这个任务,任务中的算子也不会执行。

也就是说,transformation是不会触发spark任务的执行,它们只是记录了对RDD所做的操作,不会执行。

只有当transformation之后,接着执行了一个action操作,那么所有的transformation才会执行。

Spark通过lazy这种特性,来进行底层的spark任务执行的优化,避免产生过多中间结果。

Action的特性:执行Action操作才会触发一个Spark 任务的运行,从而触发这个Action之前所有的Transformation的执行

以我们的WordCount代码为例:

 //第一步:创建SparkContextval conf = new SparkConf()conf.setAppName("WordCountScala")//设置任务名称//.setMaster("local")//local表示在本地执行val sc = new SparkContext(conf)//第二步:加载数据var path = "D:\\hello.txt"if(args.length==1){path = args(0)}//这里通过textFile()方法,针对外部文件创建了一个RDD,linesRDD,实际上,程序执行到这里为止,hello.txt文件的数据是不会被加载到内存中的。linesRDD只是代表了一个指向hello.txt文件的引用val linesRDD = sc.textFile(path)//第三步:对数据进行切割,把一行数据切分成一个一个的单词
//这里通过flatMap算子对linesRDD进行了转换操作,把每一行数据中的单词切开,获取了一个转换后的wordsRDD,但是由于linesRDD目前是没有数据的,现在不会做任何操作,只是进行了逻辑上的定义而已,最终生成的wordsRDD也只是一个逻辑上的RDD,此时里面并没有任何数据val wordsRDD = linesRDD.flatMap(_.split(" "))//第四步:迭代words,将每个word转化为(word,1)这种形式
//这个操作和前面分析的flatMap的操作是一样的,最终获取了一个逻辑上的pairRDD,此时里面也是没有任何数据的val pairRDD = wordsRDD.map((_,1))//第五步:根据key(其实就是word)进行分组聚合统计
//这个操作也是和前面分析的flatMap操作是一样的,最终获取了一个逻辑上的wordCountRDD,此时里面也是没有任何数据的val wordCountRDD = pairRDD.reduceByKey(_ + _)//第六步:将结果打印到控制台
//这行代码执行了一个action操作,foreach,此时会触发之前所有transformation算子的执行,Spark会将这些算子拆分成多个task发送到多个机器上并行执行,这个foreach算子是没有返回值的,所以不会向Driver进程返回数据,如果是reduce操作,则会向Driver进程返回最终的结果数据。
//注意:只有当任务执行到这一行代码的时候任务才会真正开始执行计算,如果任务中没有这一行代码,前面的所有算子是不会执行的wordCountRDD.foreach(wordCount=>println(wordCount._1+"--"+wordCount._2))//第七步:停止SparkContextsc.stop()

二、常用Transformation介绍

那下面我们先来看一下Spark中的Transformation算子
先来看一下官方文档,进入2.4.3的文档界面


这里面列出了Spark支持的所有的transformation算子


在这里我们先讲一些目前常见的transformation算子,个别transformation算子会在后面针对具体的应用场景分析的时候再涉及

算子           介绍
map         将RDD中的每个元素进行处理,一进一出
filter      对RDD中每个元素进行判断,返回true则保留
flatMap     与map类似,但是每个元素都可以返回一个或多个新元素
groupByKey  根据key进行分组,每个key对应一个Iterable<value>
reduceByKey 对每个相同key对应的value进行reduce操作
sortByKey   对每个相同key对应的value进行排序操作(全局排序)
join        对两个包含<key,value>对的RDD进行join操作
distinct    对RDD中的元素进行全局去重

三、Transformation操作开发实战

下面我们来针对常见的Transformation来具体写一些案例

map:对集合中每个元素乘以2
filter:过滤出集合中的偶数
flatMap:将行拆分为单词
groupByKey:对每个大区的主播进行分组
reduceByKey:统计每个大区的主播数量
sortByKey:对主播的音浪收入排序
join:打印每个主播的大区信息和音浪收入
distinct:统计当天开播的大区信息

1、Scala代码如下:

package com.imooc.scalaimport org.apache.spark.{SparkConf, SparkContext}/*** 需求:Transformation实战* map:对集合中每个元素乘以2* filter:过滤出集合中的偶数* flatMap:将行拆分为单词* groupByKey:对每个大区的主播进行分组* reduceByKey:统计每个大区的主播数量* sortByKey:对主播的音浪收入排序* join:打印每个主播的大区信息和音浪收入* distinct:统计当天开播的主播数量***/
object TransformationOpScala {def main(args: Array[String]): Unit = {val sc = getSparkContext//map:对集合中每个元素乘以2//mapOp(sc)//filter:过滤出集合中的偶数//filterOp(sc)//flatMap:将行拆分为单词//flatMap(sc)//groupByKey:对每个大区的主播进行分组//groupByKeyOp(sc)//groupByKeyOp2(sc)//reduceByKey:统计每个大区的主播数量//reduceByKeyOp(sc)//sortByKey:对主播的音浪收入排序//sortByKeyOp(sc)//sortByKeyOp2(sc)//join:打印每个主播的大区信息和音浪收入//joinOp(sc)//distinct:统计当天开播的大区信息//distinctOp(sc)sc.stop()}/*** distinct:统计当天开播的大区信息* @param sc*/def distinctOp(sc:SparkContext): Unit = {val dataRDD = sc.parallelize(Array((150001,"US"),(150002,"CN"),(150003,"CN"),(150004,"IN")))//由于是统计开播的大区信息,需要根据大区信息去重,所以只保留大区信息dataRDD.map(_._2).distinct().foreach(println(_))}/*** join:打印每个主播的大区信息和音浪收入* @param sc*/def joinOp(sc:SparkContext): Unit = {val dataRDD1 = sc.parallelize(Array((150001,"US"),(150002,"CN"),(150003,"CN"),(150004,"IN")))val dataRDD2 = sc.parallelize(Array((150001,400),(150002,200),(150003,300),(150004,100)))val joinRDD = dataRDD1.join(dataRDD2)//joinRDD.foreach(println(_))joinRDD.foreach(tup=>{//用户idval uid = tup._1val area_gold = tup._2//大区val area = area_gold._1//音浪收入val gold = area_gold._2println(uid + "\t" + area + "\t" + gold)})}/*** 第二种方法:可以动态指定排序的字段,比较灵活*sortByKey:对主播的音浪收入排序* @param sc*/def sortByKeyOp2(sc: SparkContext): Unit = {val dataRDD = sc.parallelize(Array((150001,400),(150002,200),(150003,300),(150004,100)))//sortBy的使用:可以动态指定排序的字段,比较灵活dataRDD.sortBy(_._2,false).foreach(println(_))}/*** 第一种方法* sortByKey:对主播的音浪收入排序* @param sc*/def sortByKeyOp(sc:SparkContext): Unit ={val dataRDD = sc.parallelize(Array((150001,400),(150002,200),(150003,300),(150004,100)))//由于需要对音浪收入进行排序,所以需要把音浪收入作为key,在这里要进行位置互换dataRDD.map(tup=>(tup._2,tup._1)).sortByKey(false)//默认是正序,第一个参数为true.foreach(println(_))//想要倒序需要把这个参数设置为false}/*** reduceByKey:统计每个大区的主播数量* @param sc*/def reduceByKeyOp(sc:SparkContext): Unit = {val dataRDD = sc.parallelize(Array((150001,"US"),(150002,"CN"),(150003,"CN"),(150004,"IN")))//由于这个需求只需要使用到大区信息,所以在map操作的时候只保留大区信息即可//为了计算大区的数量,所以在大区后面拼上了1,组装成了tuple2这种形式,这样就可以使用reduceByKey了dataRDD.map(tup=>(tup._2,1)).reduceByKey(_ + _).foreach(println(_))}/*** groupByKey:对每个大区的主播进行分组* tuple中数据超过2列* @param sc*/def groupByKeyOp2(sc: SparkContext): Unit = {val dataRDD = sc.parallelize(Array((150001,"US","male"),(150002,"CN","female"),(150003,"CN","male"),(150004,"IN","female")))//如果tuple中的数据列数超过了2列怎么办?//把需要作为key的那一列作为tuple2的第一列,剩下的可以再使用一个tuple2包装一下//此时map算子之后生成的新的数据格式是这样的:("US",(150001,"male"))//注意:如果你的数据结构比较复杂,你可以在执行每一个算子之后都调用foreach打印一下,确认数据的格式dataRDD.map(tup=>(tup._2,(tup._1,tup._3))).groupByKey().foreach(tup=>{//获取大区信息val area = tup._1print(area+":")//获取同一个大区对应的所有用户id和性别信息val it = tup._2for((uid,sex) <- it){print("<" + uid + "," + sex + "> ")}println()})}/*** groupByKey:对每个大区的主播进行分组* tuple中数据不超2列* @param sc*/def groupByKeyOp(sc:SparkContext): Unit = {val dataRDD = sc.parallelize(Array((150001,"US"),(150002,"CN"),(150003,"CN"),(150004,"IN")))//需要使用map对tuple中的数据进行位置互换,因为我们需要把大区作为key进行分组操作,//此时的key就是tuple中的第一列,其实在这里就可以把这个tuple认为是一个key-value//注意:在使用类似于groupByKey这种基于key的算子的时候,需要提前把RDD中的数据组装成tuple2这种形式//此时map算子之后生成的新的数据格式是这样的:("US",150001)//如果tuple中的数据列数超过了2列怎么办?看groupByKeyOp2dataRDD.map(tup=>(tup._2,tup._1)).groupByKey().foreach(tup=>{//获取大区信息val area = tup._1print(area + ":")//获取同一个大区对应的所有用户idval it = tup._2for(uid <- it){print(uid + " ")}println()})}/*** flatMap:将行拆分为单词* @param sc*/def flatMap(sc:SparkContext): Unit ={val dataRDD = sc.parallelize(Array("good good study","day day up"))dataRDD.flatMap(_.split(" ")).foreach(println(_))}/*** filter:过滤出集合中的偶数* @param sc*/def filterOp(sc:SparkContext): Unit ={val dataRDD = sc.parallelize(Array(1,2,3,4,5))dataRDD.filter(_ % 2 == 0).foreach(println(_))}/*** map:对集合中每个元素乘以2* @param sc*/def mapOp(sc:SparkContext): Unit ={val dataRDD = sc.parallelize(Array(1,2,3,4,5))dataRDD.map(_ * 2).foreach(println(_))}/*** 获取SparkContext* @return*/private def getSparkContext = {System.setProperty("hadoop.home.dir", "E:\\hadoop-3.2.0\\hadoop-3.2.0")val conf = new SparkConf()conf.setAppName("TransformationOpScala").setMaster("local")new SparkContext(conf)}}

2、java代码如下:

package com.imooc.java;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import scala.Tuple2;
import scala.Tuple3;import java.util.Arrays;
import java.util.Iterator;/*** 需求:* map:对集合中每个元素乘以2* filter:过滤出集合中的偶数* flatMap:将行拆分为单词* groupByKey:对每个大区的主播进行分组* reduceByKey:统计每个大区的主播数量* sortByKey:对主播的音浪收入排序* join:打印每个主播的大区信息和音浪收入* distinct:统计当天开播的主播数量***/
public class TransformationOpJava {public static void main(String[] args) {JavaSparkContext sc = getSparkContext();//map:对集合中每个元素乘以2//mapOp(sc);//filter:过滤出集合中的偶数//filterOp(sc);//flatMap:将行拆分为单词//flatMapOp(sc);//groupByKey:对每个大区的主播进行分组//groupByKeyOp(sc);//groupByKeyOp2(sc);//reduceByKey:统计每个大区的主播数量//reduceByKeyOp(sc);//sortByKey:对主播的音浪收入排序//sortByKeyOp(sc);//join:打印每个主播的大区信息和音浪收入//joinOp(sc);//distinct:统计当天开播的主播数量//distinctOp(sc);sc.stop();}/*** distinct:统计当天开播的主播数量* @param sc*/private static void distinctOp(JavaSparkContext sc) {Tuple2<Integer,String> t1 = new Tuple2<Integer,String>(150001, "US");Tuple2<Integer,String> t2 = new Tuple2<Integer,String>(150002, "CN");Tuple2<Integer,String> t3 = new Tuple2<Integer,String>(150003, "CN");Tuple2<Integer,String> t4 = new Tuple2<Integer,String>(150004, "IN");JavaRDD<Tuple2<Integer, String>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3, t4));dataRDD.map(new Function<Tuple2<Integer,String>, String>() {@Overridepublic String call(Tuple2<Integer, String> tup) throws Exception {return tup._2;}}).distinct().foreach(new VoidFunction<String>() {@Overridepublic void call(String area) throws Exception {System.out.println(area);}});}/*** join:打印每个主播的大区信息和音浪收入* @param sc*/private static void joinOp(JavaSparkContext sc) {Tuple2<Integer,String> t1 = new Tuple2<Integer,String>(150001, "US");Tuple2<Integer,String> t2 = new Tuple2<Integer,String>(150002, "CN");Tuple2<Integer,String> t3 = new Tuple2<Integer,String>(150003, "CN");Tuple2<Integer,String> t4 = new Tuple2<Integer,String>(150004, "IN");Tuple2<Integer,Integer> t5 = new Tuple2<Integer,Integer>(150001, 400);Tuple2<Integer,Integer> t6 = new Tuple2<Integer,Integer>(150002, 200);Tuple2<Integer,Integer> t7 = new Tuple2<Integer,Integer>(150003, 300);Tuple2<Integer,Integer> t8 = new Tuple2<Integer,Integer>(150004, 100);JavaRDD<Tuple2<Integer, String>> dataRDD1 = sc.parallelize(Arrays.asList(t1, t2, t3, t4));JavaRDD<Tuple2<Integer, Integer>> dataRDD2 = sc.parallelize(Arrays.asList(t5, t6, t7, t8));JavaPairRDD<Integer,String> dataRDD1Pair = dataRDD1.mapToPair(new PairFunction<Tuple2<Integer,String>, Integer, String>() {@Overridepublic Tuple2<Integer, String> call(Tuple2<Integer, String> tup) throws Exception {return new Tuple2<Integer,String>(tup._1,tup._2);}});JavaPairRDD<Integer,Integer> dataRDD2Pair = dataRDD2.mapToPair(new PairFunction<Tuple2<Integer,Integer>, Integer, Integer>() {@Overridepublic Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> tup) throws Exception {return new Tuple2<Integer,Integer>(tup._1,tup._2);}});dataRDD1Pair.join(dataRDD2Pair).foreach(new VoidFunction<Tuple2<Integer, Tuple2<String, Integer>>>() {@Overridepublic void call(Tuple2<Integer, Tuple2<String, Integer>> tup) throws Exception {System.out.println(tup);}});}/*** 第二种方法:使用sortBy* sortByKey:对主播的音浪收入排序* @param sc*/private static void sortByKeyOp2(JavaSparkContext sc) {Tuple2<Integer,Integer> t1 = new Tuple2<Integer,Integer>(150001, 400);Tuple2<Integer,Integer> t2 = new Tuple2<Integer,Integer>(150002, 200);Tuple2<Integer,Integer> t3 = new Tuple2<Integer,Integer>(150003, 300);Tuple2<Integer,Integer> t4 = new Tuple2<Integer,Integer>(150004, 100);JavaRDD<Tuple2<Integer, Integer>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3, t4));dataRDD.sortBy(new Function<Tuple2<Integer,Integer>, Integer>() {@Overridepublic Integer call(Tuple2<Integer, Integer> tup) throws Exception {return tup._2;}},false,1).foreach(new VoidFunction<Tuple2<Integer, Integer>>() {@Overridepublic void call(Tuple2<Integer, Integer> tup) throws Exception {System.out.println(tup);}});}/*** 第一种方法* sortByKey:对主播的音浪收入排序* @param sc*/private static void sortByKeyOp(JavaSparkContext sc) {Tuple2<Integer,Integer> t1 = new Tuple2<Integer,Integer>(150001, 400);Tuple2<Integer,Integer> t2 = new Tuple2<Integer,Integer>(150002, 200);Tuple2<Integer,Integer> t3 = new Tuple2<Integer,Integer>(150003, 300);Tuple2<Integer,Integer> t4 = new Tuple2<Integer,Integer>(150004, 100);JavaRDD<Tuple2<Integer, Integer>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3, t4));dataRDD.mapToPair(new PairFunction<Tuple2<Integer,Integer>, Integer, Integer>() {@Overridepublic Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> tup) throws Exception {return new Tuple2<Integer,Integer>(tup._2,tup._1);}}).sortByKey(false).foreach(new VoidFunction<Tuple2<Integer, Integer>>() {@Overridepublic void call(Tuple2<Integer, Integer> tup) throws Exception {System.out.println(tup);}});}/*** reduceByKey:统计每个大区的主播数量* @param sc*/private static void reduceByKeyOp(JavaSparkContext sc) {Tuple2<Integer,String> t1 = new Tuple2<Integer,String>(150001, "US");Tuple2<Integer,String> t2 = new Tuple2<Integer,String>(150002, "CN");Tuple2<Integer,String> t3 = new Tuple2<Integer,String>(150003, "CN");Tuple2<Integer,String> t4 = new Tuple2<Integer,String>(150004, "IN");JavaRDD<Tuple2<Integer,String>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3, t4));dataRDD.mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> tup) throws Exception {return new Tuple2<String,Integer>(tup._2,1);}}).reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer i1, Integer i2) throws Exception {return i1 + i2;}}).foreach(new VoidFunction<Tuple2<String, Integer>>() {@Overridepublic void call(Tuple2<String, Integer> tup) throws Exception {System.out.println(tup);}});}/*** groupByKey:对每个大区的主播进行分组* tuple中数据超过2列* @param sc*/private static void groupByKeyOp2(JavaSparkContext sc) {Tuple3<Integer,String,String> t1 = new Tuple3<Integer,String,String>(150001, "US", "male");Tuple3<Integer,String,String> t2 = new Tuple3<Integer,String,String>(150002, "CN", "female");Tuple3<Integer,String,String> t3 = new Tuple3<Integer,String,String>(150003, "CN", "male");Tuple3<Integer,String,String> t4 = new Tuple3<Integer,String,String>(150004, "IN", "female");JavaRDD<Tuple3<Integer,String,String>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3, t4));dataRDD.mapToPair(new PairFunction<Tuple3<Integer,String,String>, String, Tuple2<Integer,String>>() {@Overridepublic Tuple2<String, Tuple2<Integer, String>> call(Tuple3<Integer, String, String> tup) throws Exception {return new Tuple2<String,Tuple2<Integer, String>>(tup._2(),new Tuple2<Integer,String>(tup._1(),tup._3()));}}).groupByKey().foreach(new VoidFunction<Tuple2<String, Iterable<Tuple2<Integer, String>>>>() {@Overridepublic void call(Tuple2<String, Iterable<Tuple2<Integer, String>>> tup) throws Exception {//获取大区信息String area = tup._1;System.out.println(area + ":");//获取同一个大区对应的所有用户id和性别信息Iterable<Tuple2<Integer,String>> it = tup._2;for(Tuple2<Integer,String> tu:it){System.out.println("<" + tu._1 + "," + tu._2 + ">");}System.out.println();}});}/*** groupByKey:对每个大区的主播进行分组* tuple中数据不超过2列* @param sc*/private static void groupByKeyOp(JavaSparkContext sc) {Tuple2<Integer,String> t1 = new Tuple2<Integer,String>(150001, "US");Tuple2<Integer,String> t2 = new Tuple2<Integer,String>(150002, "CN");Tuple2<Integer,String> t3 = new Tuple2<Integer,String>(150003, "CN");Tuple2<Integer,String> t4 = new Tuple2<Integer,String>(150004, "IN");JavaRDD<Tuple2<Integer,String>> dataRDD = sc.parallelize(Arrays.asList(t1,t2,t3,t4));//如果想要使用...ByKey之类的算子,需要先使用...ToPair算子dataRDD.mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> tup) throws Exception {return new Tuple2<String,Integer>(tup._2,tup._1);}}).groupByKey().foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {@Overridepublic void call(Tuple2<String, Iterable<Integer>> tup) throws Exception {//获取大区信息String area = tup._1;System.out.println(area + ":");//获取同一个大区对应的所有用户idIterable<Integer> it = tup._2;for(Integer uid:it){System.out.println(uid + " ");}System.out.println();}});}/*** flatMap:将行拆分为单词* @param sc*/private static void flatMapOp(JavaSparkContext sc) {JavaRDD<String> dataRDD = sc.parallelize(Arrays.asList("good good study", "day day up"));dataRDD.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String line) throws Exception {String[] words = line.split(" ");return Arrays.asList(words).iterator();}}).foreach(new VoidFunction<String>() {@Overridepublic void call(String word) throws Exception {System.out.println(word);}});}/*** filter:过滤出集合中的偶数* @param sc*/private static void filterOp(JavaSparkContext sc) {JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));dataRDD.filter(new Function<Integer, Boolean>() {@Overridepublic Boolean call(Integer i1) throws Exception {return i1 % 2 == 0;}}).foreach(new VoidFunction<Integer>() {@Overridepublic void call(Integer i1) throws Exception {System.out.println(i1);}});}/*** map:对集合中每个元素乘以2* @param sc*/private static void mapOp(JavaSparkContext sc) {JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1,2,3,4,5));dataRDD.map(new Function<Integer, Integer>() {@Overridepublic Integer call(Integer i1) throws Exception {return i1 * 2;}}).foreach(new VoidFunction<Integer>() {@Overridepublic void call(Integer i1) throws Exception {System.out.println(i1);}});}/*** 获取SparkContext* @return*/private static JavaSparkContext getSparkContext(){System.setProperty("hadoop.home.dir", "E:\\hadoop-3.2.0\\hadoop-3.2.0");SparkConf conf = new SparkConf();conf.setAppName("TransformationOpJava").setMaster("local");return new JavaSparkContext(conf);}}

四、常用Action介绍

先看一下官网中的action算子

接下来看一下常见的Action算子

算子               介绍
reduce          将RDD中的所有元素进行聚合操作
collect         将RDD中所有元素获取到本地客户端(Driver)
count           获取RDD中元素总数
take(n)         获取RDD中前n个元素
saveAsTextFile  将RDD中元素保存到文件中,对每个元素调用toString
countByKey      对每个key对应的值进行count计数
foreach         遍历RDD中的每个元素

五、Action操作开发实战

下面针对常见的Action算子来写一些具体案例

reduce:聚合计算
collect:获取元素集合
take(n):获取前n个元素
count:获取元素总数
saveAsTextFile:保存文件
countByKey:统计相同的key出现多少次
foreach:迭代遍历元素

1、scala代码如下:

package com.imooc.scalaimport org.apache.spark.{SparkConf, SparkContext}/*** 需求:Action实战* reduce:聚合计算* collect:获取元素集合* take(n):获取前n个元素* count:获取元素总数* saveAsTextFile:保存文件* countByKey:统计相同的key出现多少次* foreach:迭代遍历元素**/
object ActionOpScala {def main(args: Array[String]): Unit = {val sc = getSparkContext//reduce:聚合计算//reduceOp(sc)//collect:获取元素集合//collectOp(sc)//take(n):获取前n个元素//takeOp(sc)//count:获取元素总数//countOp(sc)//saveAsTextFile:保存文件//saveAsTextFileOp(sc)//countByKey:统计相同的key出现多少次//countByKeyOp(sc)//foreach:迭代遍历元素//foreachOp(sc)sc.stop()}/*** foreach:迭代遍历元素* @param sc*/def foreachOp(sc: SparkContext): Unit = {val dataRDD = sc.parallelize(Array(1,2,3,4,5))//注意:foreach不仅限于执行println操作,这个只是在测试的时候使用的//实际工作中如果需要把计算的结果保存到第三方的存储介质中,就需要使用foreach,//在里面实现具体向外部输出数据的代码dataRDD.foreach(println(_))}/*** countByKey:统计相同的key出现多少次* @param sc*/def countByKeyOp(sc: SparkContext): Unit = {val dataRDD = sc.parallelize(Array(("A",1001),("B",1002),("A",1003),("C",1004)))//返回的是一个map类型的数据val res = dataRDD.countByKey()for((k,v) <- res){println(k+","+v)}}/*** saveAsTextFile:保存文件* @param sc*/def saveAsTextFileOp(sc: SparkContext): Unit = {val dataRDD = sc.parallelize(Array(1,2,3,4,5))//指定HDFS的路径信息即可,需要指定一个不存在的目录dataRDD.saveAsTextFile("hdfs://bigdata01:9000/out0524")}/*** count:获取元素总数* @param sc*/def countOp(sc: SparkContext): Unit = {val dataRDD = sc.parallelize(Array(1,2,3,4,5))val res = dataRDD.count()println(res)}/*** take(n):获取前n个元素* @param sc*/def takeOp(sc: SparkContext): Unit = {val dataRDD = sc.parallelize(Array(1,2,3,4,5))//从RDD中获取前2个元素val res = dataRDD.take(2)for(item <- res){println(item)}}/*** collect:获取元素集合* @param sc*/def collectOp(sc: SparkContext): Unit = {val dataRDD = sc.parallelize(Array(1,2,3,4,5))//collect返回的是一个Array数组//注意:如果RDD中数据量过大,不建议使用collect,因为最终的数据会返回给Driver进程所在的节点//如果想要获取几条数据,查看一下数据格式,可以使用take(n)val res = dataRDD.collect()for(item <- res){println(item)}}/*** reduce:聚合计算* @param sc*/def reduceOp(sc: SparkContext): Unit = {val dataRDD = sc.parallelize(Array(1,2,3,4,5))val num = dataRDD.reduce(_ + _)println(num)}/*** 获取SparkContext* @return*/private def getSparkContext = {System.setProperty("hadoop.home.dir", "E:\\hadoop-3.2.0\\hadoop-3.2.0")val conf = new SparkConf()conf.setAppName("ActionOpScala").setMaster("local")new SparkContext(conf)}}

2、java代码如下:

package com.imooc.java;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;import java.util.Arrays;
import java.util.List;
import java.util.Map;/*** 需求:Action实战* reduce:聚合计算* collect:获取元素集合* take(n):获取前n个元素* count:获取元素总数* saveAsTextFile:保存文件* countByKey:统计相同的key出现多少次* foreach:迭代遍历元素**/
public class ActionOpJava {public static void main(String[] args) {JavaSparkContext sc = getSparkContext();//reduce:聚合计算//reduceOp(sc);//collect:获取元素集合//collectOp(sc);//take(n):获取前n个元素//takeOp(sc);//count:获取元素总数//countOp(sc);//saveAsTextFile:保存文件//saveAsTextFileOp(sc);//countByKey:统计相同的key出现多少次//countByKeyOp(sc);//foreach:迭代遍历元素//foreachOp(sc);sc.stop();}/*** foreach:迭代遍历元素* @param sc*/private static void foreachOp(JavaSparkContext sc) {JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));dataRDD.foreach(new VoidFunction<Integer>() {@Overridepublic void call(Integer i) throws Exception {System.out.println(i);}});}/*** countByKey:统计相同的key出现多少次* @param sc*/private static void countByKeyOp(JavaSparkContext sc) {Tuple2<String, Integer> t1 = new Tuple2<>("A", 1001);Tuple2<String, Integer> t2 = new Tuple2<>("B", 1002);Tuple2<String, Integer> t3 = new Tuple2<>("A", 1003);Tuple2<String, Integer> t4 = new Tuple2<>("C", 1004);JavaRDD<Tuple2<String, Integer>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3, t4));//想要使用countByKey,需要先使用mapToPair对RDD进行转换Map<String, Long> res = dataRDD.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(Tuple2<String, Integer> tup)throws Exception {return new Tuple2<String, Integer>(tup._1, tup._2);}}).countByKey();for(Map.Entry<String,Long> entry: res.entrySet()){System.out.println(entry.getKey()+","+entry.getValue());}}/*** saveAsTextFile:保存文件* @param sc*/private static void saveAsTextFileOp(JavaSparkContext sc) {JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));dataRDD.saveAsTextFile("hdfs://bigdata01:9000/out05242");}/*** count:获取元素总数* @param sc*/private static void countOp(JavaSparkContext sc) {JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));long res = dataRDD.count();System.out.println(res);}/*** take(n):获取前n个元素* @param sc*/private static void takeOp(JavaSparkContext sc) {JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));List<Integer> res = dataRDD.take(2);for(Integer item : res){System.out.println(item);}}/*** collect:获取元素集合* @param sc*/private static void collectOp(JavaSparkContext sc) {JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));List<Integer> res = dataRDD.collect();for(Integer item : res){System.out.println(item);}}/*** reduce:聚合计算* @param sc*/private static void reduceOp(JavaSparkContext sc) {JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));Integer num = dataRDD.reduce(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer i1, Integer i2) throws Exception {return i1 + i2;}});System.out.println(num);}/*** 获取SparkContext* @return*/private static JavaSparkContext getSparkContext() {System.setProperty("hadoop.home.dir", "E:\\hadoop-3.2.0\\hadoop-3.2.0");SparkConf conf = new SparkConf();conf.setAppName("ActionOpJava").setMaster("local");return new JavaSparkContext(conf);}}

Spark07:【案例】Transformation和Action相关推荐

  1. Spark RDD/Core 编程 API入门系列 之rdd实战(rdd基本操作实战及transformation和action流程图)(源码)(三)...

    本博文的主要内容是: 1.rdd基本操作实战 2.transformation和action流程图 3.典型的transformation和action RDD有3种操作: 1.  Trandform ...

  2. Spark的transformation和action算子简介

    transformation算子 map(func) 返回一个新的分布式数据集,由每个原元素经过func函数处理后的新元素组成 filter(func) 返回一个新的数据集,由经过func函数处理后返 ...

  3. Spark(4)——transformation、action、persist

    RDD数据是不可变的: transformation 将一个RDD变成一个新的RDD' 比如mapreduce中的map操作,将数据集里的元素做处理变成新的元素,形成RDD'.transformati ...

  4. RDD的两种操作(Transformation和Action)

    RDD创建后就可以在RDD上进行数据处理.RDD支持两种操作:转换(transformation),即从现有的数据集创建一个新的数据集:动作(action),即在数据集上进行计算后,返回一个值给Dri ...

  5. spark rdd Transformation和Action 剖析

    1.看到 这篇总结的这么好, 就悄悄的转过来,供学习 wordcount.toDebugString查看RDD的继承链条 所以广义的讲,对任何函数进行某一项操作都可以认为是一个算子,甚至包括求幂次,开 ...

  6. Spark学习之路 (六)Spark Transformation和Action

    Transformation算子 基本的初始化 java static SparkConf conf = null;static JavaSparkContext sc = null;static { ...

  7. Spark transformation算子案例

    Spark支持两种RDD操作:transformation和action  在本文中,将对几个常用的transformation算子进行案例演示,采用Java和Scala两种语言对代码进行编写  其中 ...

  8. Spark action算子案例

    在上篇文章中,我们对Spark中几种常用的transformation算子通过Java和Scala两种代码分别进行了案例演示,Spark transformation算子案例  而在本文中,我们将继续 ...

  9. 【Spark】(五)Spark Transformation(转换算子) 和 Action(执行算子)

    文章目录 一.Transformation 和 Action 1.转换操作 2.行动操作 二.map.flatMap.mapParations.mapPartitionsWithIndex 2.1 m ...

最新文章

  1. 归并排序以及三种常见优化
  2. 收藏 | 精选11篇AI领域论文(附代码、数据集链接)
  3. Spring MVC实现上传文件报错解决方案
  4. TikTok信息流广告怎么做才有效果?我从100个营销短视频中总结了这些方法
  5. Mvc全局过滤器与Action排除
  6. Java中如何读写cookie (二)
  7. 提高阅读源代码的效率 转
  8. 2018年对PHP的新认知
  9. 移动IM开发那些事:技术选型和常见问题
  10. TestinPro应用与DevOps之路
  11. angular 注入器配置_Angular依赖注入介绍
  12. FreeRTOS任务优先级
  13. 毕业准备:外企面试--基本涵盖了所有问题【附带有答案版本】
  14. python文档字符串格式_Python字符串及文本模式方法详解
  15. 检查PHP扩展是否安装成功
  16. 【Android】1.开发环境搭建
  17. iOS及Mac开源项目和学习资料【超级全面】
  18. JavaWeb学习总结详解
  19. Python之深入解析Numpy的高级操作和使用
  20. 熊出没之伐木机器人_熊出没:最强大的4大机器人登场,熊大熊二“苦不堪言”...

热门文章

  1. 施密特触发器运算放大电路
  2. Python爬虫——京东商品信息 前期准备
  3. 工行H5移动在线支付问题
  4. JDK1.8新特性之Lambda表达式+Stream流+函数式接口
  5. D435i相机获取某一点深度图像的深度值(ROS实现以及官方API调用)
  6. Elasticsearch启动遇到nofile、nproc、jvm等报错
  7. se linux影响性能,性能 | SELinux+
  8. Emergency 紧急报文的实施
  9. popstate、pushState、replaceState操作浏览器历史记录
  10. ACM MM2020 | 一个卡通人脸识别的基准数据集