目录

  • 基本概念
  • 算子介绍
    • 1. reduce
    • 2. collect
    • 3. count
    • 4. first
    • 5. take
    • 6. takeOrdered
    • 案例实操1-6
    • 7. aggregate
    • 8. fold
    • 案例实操7-8
    • 9. countByKey
    • 案例实操
    • 10. save相关算子
    • 案例实操
    • 11. foreach
    • 案例实操

基本概念

行动算子主要是将在数据集上运行计算后的数值返回到驱动程序,从而触发触发作业(Job)的执行。其底层代码调用的就是runJob的方法,底层会创建ActiveJob,并提交执行。

算子介绍

1. reduce

函数定义

def reduce(f: (T, T) => T): T

说明
聚集RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据。

2. collect

函数定义

def collect(): Array[T]

说明
在驱动程序中,以数组Array 的形式返回数据集的所有元素 。

3. count

函数定义

def count(): Long

说明
返回RDD 中元素的个数。

4. first

函数定义

def first(): T

说明
返回RDD 中的第一个元素 。

5. take

函数定义

def take(num: Int): Array[T]

说明
返回一个由RDD 的前 n 个元素组成的数组。

6. takeOrdered

函数定义

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

说明
返回该RDD 排序后的前 n 个元素组成的数组。

案例实操1-6

package com.atguigu.bigdata.spark.core.rdd.actionimport org.apache.spark.{SparkConf, SparkContext}object Spark02_RDD_Operator_Action {def main(args: Array[String]): Unit = {//TODO 准备环境val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1,2,3,4))//TODO - 行动算子//reduce
//    val i = rdd.reduce(_ + _)
//
//    println(i)//collect:方法会讲不同分区的数据按照分区顺序采集到Driver端内存中,形成数组
//    val ints: Array[Int] = rdd.collect()
//
//    println(ints.mkString(","))//统计数据源中的数据的个数val cnt = rdd.count()println(cnt)//first:获取数据源中第一个数据val first = rdd.first()println(first)//take:获取N个数据val ints: Array[Int] = rdd.take(3)println(ints.mkString(","))val rdd1 = sc.makeRDD(List(4,2,3,1))//takeOrdered:排序后,获取N个数据val ints1: Array[Int] = rdd1.takeOrdered(3)println(ints1.mkString(","))sc.stop()}
}

7. aggregate

函数定义

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

说明
分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合。

8. fold

函数定义

def fold(zeroValue: T)(op: (T, T) => T): T

说明
折叠操作,aggregate 的简化版操作。

案例实操7-8

package com.atguigu.bigdata.spark.core.rdd.actionimport org.apache.spark.{SparkConf, SparkContext}object Spark03_RDD_Operator_Action {def main(args: Array[String]): Unit = {//TODO 准备环境val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1,2,3,4),2)//TODO - 行动算子//aggregateByKey:初始值只会参与分区内的计算//aggregate:初始值会参与分区内的计算,并且参与分区间的计算
//    val result: Int = rdd.aggregate(10)(_ + _, _ + _)val result: Int = rdd.fold(10)(_ + _)println(result)sc.stop()}
}

9. countByKey

函数定义

def countByKey(): Map[K, Long]

说明
统计每种key 的个数。

package com.atguigu.bigdata.spark.core.rdd.actionimport org.apache.spark.{SparkConf, SparkContext}object Spark04_RDD_Operator_Action {def main(args: Array[String]): Unit = {//TODO 准备环境val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1,1,3,4),2)val rdd1 = sc.makeRDD(List(("a",1),("a",2),("a",3)))//TODO - 行动算子val intToLong: collection.Map[Int, Long] = rdd.countByValue()println(intToLong)val stringToLong: collection.Map[String, Long] = rdd1.countByKey()println(stringToLong)sc.stop()}
}

案例实操

10. save相关算子

函数定义

def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile( path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit

说明
将数据保存到不同格式的文件中。

案例实操

package com.atguigu.bigdata.spark.core.rdd.actionimport org.apache.spark.{SparkConf, SparkContext}object Spark05_RDD_Operator_Action {def main(args: Array[String]): Unit = {//TODO 准备环境val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3)))//TODO - 行动算子rdd.saveAsTextFile("output")rdd.saveAsObjectFile("output1")//saveAsSequenceFile方法要求数据的格式必须为K-V类型rdd.saveAsSequenceFile("output2")sc.stop()}
}

11. foreach

函数定义

def foreach(f: T => Unit): Unit = withScope { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}

说明
分布式遍历RDD 中的每一个元素,调用指定函数。

案例实操

package com.atguigu.bigdata.spark.core.rdd.actionimport org.apache.spark.{SparkConf, SparkContext}object Spark06_RDD_Operator_Action {def main(args: Array[String]): Unit = {//TODO 准备环境val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1,2,3,4) )//foreach其实Driver端内存集合循环遍历的方法rdd.collect().foreach(println)println("*************")//foreach其实是Executor端内存数据打印rdd.foreach(println)//算子:Operator(算子)//RDD的方法和Scala集合对象的方法不一样//集合对象的方法都是在同一个节点的内存中完成的//RDD的方法可以将计算逻辑发送到Executor端(分布式节点)执行//为了区分不同的处理效果,所以将RDD的方法称之为算子//RDD的方法外部的操作都是在Driver端执行的,而方法内部的逻辑代码是在EXecutor端执行sc.stop()}
}

Spark的RDD行动算子相关推荐

  1. Spark的RDD转换算子

    目录 RDD转换算子 Value 类型 1. map 2. mapPartitions map 和mapPartitions 的区别 3. mapPartitionsWithIndex 4. flat ...

  2. spark常用RDD算子 汇总(java和scala版本)

    github: https://github.com/zhaikaishun/spark_tutorial  spark RDD的算子挺多,有时候如何灵活的使用,该如何用一下子想不起来,这一段时间将s ...

  3. spark之RDD的转换算子与行为算子的具体使用

    文章目录 1.Transform算子 1.1 map 1.2 flatmap 1.3 groupBy和groupBykey 1.4 filter 1.5 Mappartitions 1.6 mapVa ...

  4. Spark:RDD编程总结(概述、算子、分区、共享变量)

    目录 1.RDD概述 1.1.RDD是什么 1.2.RDD的弹性 1.3.RDD的特点 1.3.1.分区 1.3.2.只读 1.3.3.依赖 1.3.4.缓存 1.3.5.检查点 2.RDD编程 2. ...

  5. Spark学习笔记(7)——RDD行动算子

    RDD方法又称RDD算子. 算子 : Operator(操作) RDD的方法和Scala集合对象的方法不一样,集合对象的方法都是在同一个节点的内存中完成的.RDD的方法可以将计算逻辑发送到Execut ...

  6. Spark的RDD持久化

    RDD持久化 1. RDD Cache 缓存 说明 RDD 通过Cache 或者Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存在JVM 的堆内存中.但是并不是这两个方法被调用时立即 ...

  7. Spark的RDD依赖关系

    RDD依赖关系 RDD 血缘关系 RDD 只支持粗粒度转换,即在大量记录上执行的单个操作.将创建 RDD 的一系列Lineage(血统)记录下来,以便恢复丢失的分区.RDD 的Lineage 会记录R ...

  8. Spark的RDD序列化

    RDD序列化 1. 闭包检查 从计算的角度, 算子以外的代码都是在Driver 端执行, 算子里面的代码都是在 Executor端执行.那么在 scala 的函数式编程中,就会导致算子内经常会用到算子 ...

  9. Spark _07_补充部分算子【二】

    接Spark _06_补充部分算子[一] https://blog.csdn.net/qq_41946557/article/details/102673673 scala API package d ...

最新文章

  1. SQL DEVELOPER 打不开了
  2. 《数学之美》第11章 如何确定网页和查询的相关性
  3. vue2 + vue-router + vuex + iview 入门项目
  4. quercus mysql_14.5 Quercus 原理及展望
  5. SQL大数据查询优化
  6. python报错 TypeError: an integer is required
  7. 一台电脑上同启动两个Tomcat的方式,windows/Linux配置。
  8. 8.基本数据结构-顺序表和链表
  9. JAVA入门级教学之(switch语句)
  10. 互联网晚报 | 11月20日 星期六 | 阿里云单季营收首次超200亿;淘特年度活跃用户超2.4亿;首届中国网络文明大会在京召开...
  11. tp摄像头的默认地址_tp-link怎么设置无线桥接 tp-link设置无线桥接方法【图文】...
  12. 如何解决”ArcGIS Server Site is currently being configured by another administrative operation“的问题
  13. mysql中datetime有带时区_当服务器时区不是UTC时,从Java中检索来自MySQL的UTC DATETIME字段...
  14. C语言在工业工程专业的应用,工业工程专业知识介绍
  15. 【配置关系】—Entity Framework实例详解
  16. 项目管理之项目周报模板
  17. PDF虚拟打印机的功能详解和使用方法
  18. 奥的斯自动人行道服务器密码,奥的斯服务器中文说明21页
  19. 主分区、扩展分区、逻辑分区和活动分区的区别与联系
  20. ZYNQ的Linux Linaro系统镜像制作SD卡启动

热门文章

  1. Vivado各个过程产生的文件与ISE的对比
  2. 信道编码之差错控制方式
  3. 利用VSTS工具自动测试
  4. 自学web前端的方法都有哪些?新手怎么学HTML5
  5. cmd xcopy进行远程复制
  6. 逻辑回归损失函数(cost function)
  7. @ConditionalOnProperty 详解
  8. Tornado框架中视图模板Template的使用
  9. 饥荒 死亡后不删存档的办法
  10. hdu 3732(01背包转多重背包)