在上篇文章中,我们对Spark中几种常用的transformation算子通过Java和Scala两种代码分别进行了案例演示,Spark transformation算子案例 
而在本文中,我们将继续对Spark的另一种RDD操作action进行讲解。对常用的action算子,使用Java和Scala两种代码进行简单的案例演示。

action常用算子介绍

Java版本

@SuppressWarnings("unused")
public class ActionOperation {public static void main(String[] args) {
//      reduce();
//      collect();
//      count();
//      take();
//      saveAsTextFile();countByKey();}private static void reduce(){// 创建SparkConf和JavaSparkContextSparkConf conf = new SparkConf().setAppName("reduce").setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);// 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);JavaRDD<Integer> numbers = sc.parallelize(numberList);// 使用reduce操作对集合中的数字进行累加// reduce操作的原理:// 首先将第一个和第二个元素,传入call()方法,进行计算,会获取一个结果,比如1 + 2 = 3// 接着将该结果与下一个元素传入call()方法,进行计算,比如3 + 3 = 6// 所以reduce操作的本质,就是聚合,将多个元素聚合成一个元素int sum = numbers.reduce(new Function2<Integer, Integer, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}});System.out.println(sum);// 关闭JavaSparkContextsc.close();}private static void collect(){// 创建SparkConf和JavaSparkContextSparkConf conf = new SparkConf().setAppName("collect").setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);// 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);JavaRDD<Integer> numbers = sc.parallelize(numberList);// 使用map操作将集合中的所有数字乘以2JavaRDD<Integer> doubleNumbers = numbers.map(new Function<Integer, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Integer call(Integer v1) throws Exception {return v1 * 2;}});// foreach action操作,是在远程集群上遍历RDD中的元素// 而使用collect操作,将分布式在远程集群上的doubleNumbers RDD的数据拉取到本地// 这种方式,一般不建议使用,因为如果rdd中的数据量比较大的话,比如超过1万条// 那么性能会比较差,因为要从远程走大量的网路传输,将数据获取到本地// 此外,除了性能差,还可能在rdd中数据量特别大的情况下,发生oom异常,内存溢出// 因此,通常,还是推荐使用foreach action操作,来对最终的rdd元素进行处理List<Integer> doubleNumberList = doubleNumbers.collect();for(Integer num : doubleNumberList){System.out.println(num);}// 关闭JavaSparkContextsc.close();}private static void count(){// 创建SparkConf和JavaSparkContextSparkConf conf = new SparkConf().setAppName("count").setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);// 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);JavaRDD<Integer> numbers = sc.parallelize(numberList);// 对RDD使用count操作,统计RDD中有多少个元素long count = numbers.count();System.out.println(count);// 关闭JavaSparkContextsc.close();}private static void take(){// 创建SparkConf和JavaSparkContextSparkConf conf = new SparkConf().setAppName("take").setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);// 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);JavaRDD<Integer> numbers = sc.parallelize(numberList);  // take操作,与collect类似,也是从远程集群上,获取rdd的数据,拉取到本地// 但是collect操作是获取rdd的所有数据,take只是获取n个数据List<Integer> top3Numbers = numbers.take(3);for(Integer num : top3Numbers){System.out.println(num);}// 关闭JavaSparkContextsc.close();}private static void saveAsTextFile(){// 创建SparkConf和JavaSparkContextSparkConf conf = new SparkConf().setAppName("saveAsTextFile").setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);// 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);JavaRDD<Integer> numbers = sc.parallelize(numberList);  // 使用map操作将集合中的所有数字乘以2JavaRDD<Integer> doubleNumbers = numbers.map(new Function<Integer, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Integer call(Integer v1) throws Exception {return v1 * 2;}});// 直接将rdd中的数据,保存到HDFS文件中// 但是要注意,我们这里只能指定文件夹,也就是目录// 那么实际上,会保存为目录中的/double_number/part-00000文件doubleNumbers.saveAsTextFile("hdfs://spark1:9000/double_number");// 关闭JavaSparkContextsc.close();}private static void countByKey(){// 创建SparkConf和JavaSparkContextSparkConf conf = new SparkConf().setAppName("countByKey").setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);// 模拟集合@SuppressWarnings("unchecked")List<Tuple2<String, String>> studentList = Arrays.asList(new Tuple2<String, String>("class1", "leo"),new Tuple2<String, String>("class2", "jack"),new Tuple2<String, String>("class1", "marry"),new Tuple2<String, String>("class2", "tom"),new Tuple2<String, String>("class2", "david"));// 并行化创建集合,创建JavaPairRDDJavaPairRDD<String, String> students = sc.parallelizePairs(studentList);// 对RDD应用countByKey操作,统计每个班级的学生人数,也就是统计每个key对应的元素个数// 这就是countByKey的作用// countByKey返回的类型,直接就是Map<String, Object>Map<String, Object> studentCounts = students.countByKey();for(Map.Entry<String, Object> studentCount : studentCounts.entrySet()){System.out.println(studentCount.getKey() + ":" + studentCount.getValue());}// 关闭JavaSparkContextsc.close();}
}

Scala版本

object ActionOperation {def main(args: Array[String]): Unit = {
//    reduce()
//    collect()
//    count()
//    take()
//    saveAsTextFile()countByKey()}def reduce(): Unit = {val conf = new SparkConf().setAppName("reduce").setMaster("local")val sc = new SparkContext(conf)val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)val numbers = sc.parallelize(numberArray, 1)val sum = numbers.reduce(_ + _)println(sum)}def collect(): Unit = {val conf = new SparkConf().setAppName("reduce").setMaster("local")val sc = new SparkContext(conf)val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)val numbers = sc.parallelize(numberArray, 1)val doubleNumbers = numbers.map { num => num * 2 }val doubleNumberArray = doubleNumbers.collect()for(num <- doubleNumberArray){println(num)}}def count(): Unit = {val conf = new SparkConf().setAppName("reduce").setMaster("local")val sc = new SparkContext(conf)val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)val numbers = sc.parallelize(numberArray, 1)val count = numbers.count()println(count)    }def take(): Unit = {val conf = new SparkConf().setAppName("reduce").setMaster("local")val sc = new SparkContext(conf)val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)val numbers = sc.parallelize(numberArray, 1)val top3Numbers = numbers.take(3)for(num <- top3Numbers){println(num)}}def saveAsTextFile(): Unit = {val conf = new SparkConf().setAppName("reduce").setMaster("local")val sc = new SparkContext(conf)val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)val numbers = sc.parallelize(numberArray, 1)val doubleNumbers = numbers.map { num => num * 2 }doubleNumbers.saveAsTextFile("hdfs://spark1:9000/double_number")}def countByKey(): Unit = {val conf = new SparkConf().setAppName("reduce").setMaster("local")val sc = new SparkContext(conf)val studentList = Array(new Tuple2("class1", "leo"),new Tuple2("class2", "jack"),new Tuple2("class1", "tom"),new Tuple2("class2", "jen"),new Tuple2("class2", "marry"))val students = sc.parallelize(studentList, 1)val studentCounts = students.countByKey()println(studentCounts)}}

Spark action算子案例相关推荐

  1. Spark transformation算子案例

    Spark支持两种RDD操作:transformation和action  在本文中,将对几个常用的transformation算子进行案例演示,采用Java和Scala两种语言对代码进行编写  其中 ...

  2. Spark的transformation和action算子简介

    transformation算子 map(func) 返回一个新的分布式数据集,由每个原元素经过func函数处理后的新元素组成 filter(func) 返回一个新的数据集,由经过func函数处理后返 ...

  3. Spark RDD算子(transformation + action)

    概念 RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变.可分区.里面的元素可并行计算的集合.RDD具有数据流模 ...

  4. 大数据技术之_19_Spark学习_07_Spark 性能调优 + 数据倾斜调优 + 运行资源调优 + 程序开发调优 + Shuffle 调优 + GC 调优 + Spark 企业应用案例

    大数据技术之_19_Spark学习_07 第1章 Spark 性能优化 1.1 调优基本原则 1.1.1 基本概念和原则 1.1.2 性能监控方式 1.1.3 调优要点 1.2 数据倾斜优化 1.2. ...

  5. Spark部分算子及使用

    Spark部分算子及使用 案例一:flatmap算子 def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppN ...

  6. Spark学习之Spark RDD算子

    个人主页zicesun.com 这里,从源码的角度总结一下Spark RDD算子的用法. 单值型Transformation算子 map /*** Return a new RDD by applyi ...

  7. Spark _30_SparkStreaming算子操作Driver HA

    SparkStreaming算子操作 foreachRDD output operation算子,必须对抽取出来的RDD执行action类算子,代码才能执行. import org.apache.sp ...

  8. Dstream的action算子与RDD的action算子

    Dstream action算子 print() 在运行流应用程序的驱动程序节点上打印DStream中每批数据的前10个元素.这对于开发和调试非常有用.这在Python API中称为pprint(). ...

  9. 【大数据开发】SparkCore——进阶算子、Action算子、查看分区数的三种方式

    源代码中的大写V,指的是value rdd.getNumberPartitions获取分区数量 Transformation算⼦全都是RDD[U,T]类型的 Action算子的返回值一般情况下不会是R ...

最新文章

  1. WCF中的序列化[上篇]
  2. 趣谈网络协议笔记-二(第五讲)
  3. Asp.net禁用site.Mobile.Master
  4. 华gt2升级鸿蒙,华为手表GT2 Pro已开始内部测试,升级鸿蒙操作系统
  5. UVA272--TEX Quotes【字符串】
  6. PHP开发中涉及到emoji表情的几种处理方法
  7. git不区分文件名大小写这种坑当然要跳出来
  8. MySQL联合查询及取别名
  9. 基因组信息学参考习题
  10. 计算机上面mac怎么查看,怎么看电脑的mac地址
  11. 2B领域最大的媒体沙龙又来了,你以什么姿势参加?
  12. JVM 垃圾收集算法及垃圾收集器
  13. 工业设计公司:从外观设计到软硬件设计
  14. 【提前批】【第二批】CUHK CSE 面经2022.6.17
  15. android 电容屏多点触控协议
  16. 第二届太原理工大学程序设计新生赛决赛-(Cappuccino ~ the end of journey-M)简单模拟
  17. 亲测好用的6个临时邮箱推荐
  18. 新数据整合的五大方式
  19. Redis性能优化方案总结
  20. 差之毫厘谬之千里!带你认识CPU后缀含义

热门文章

  1. sql oracle 自增长字段,在Oracle、MySQL、MS SQL Server中创设自动增长字段
  2. easyexcel获取所有sheet页名称_老板让我汇总多个sheet,我不会,同事却说使用PQ仅需2步搞定...
  3. java获取进程端口_查看进程的端口号
  4. Python字符串介绍
  5. DHCP和DHCP中继功能与配置
  6. 编写字符串反转函数 .
  7. 金山手机控usb调试模式开启工具_不看不知道手机有多卡!一款深挖手机的良心工具...
  8. linux卸载mariadb数据库,CentOS yum 安装、卸载MariaDB数据库
  9. java窗口how2j_How2J Java 基础
  10. wordpress如何让百度快速收录_如何解决百度收录问题 - 百度蜘蛛池