文章目录

  • 1、Transform算子
    • 1.1 map
    • 1.2 flatmap
    • 1.3 groupBy和groupBykey
    • 1.4 filter
    • 1.5 Mappartitions
    • 1.6 mapValues
    • 1.7 sort
    • 1.8 simple
    • 1.9 union
  • 2、 Actions算子
    • 2.1 count,collect,reduce,save,lookup
    • 2.2 foreach 和 foreachPartition

1、Transform算子

1.1 map

package com.shujia.coreimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** @return :danzhu努力学bigdata**         1、转换算子:由一个RDD变成另一个RDD,是RDD之间的转换,是懒执行的**         2、行为算子:由一个RDD调用,但是最后没有返回新的RDD,而是返回了其他数据类型*                    行为算子可以触发任务的执行,每个actions 算子都会触发一个任务**/
object SparkRDDMap {def main(args: Array[String]): Unit = {/*** map算子*///创建spark的上下文环境val conf: SparkConf = new SparkConf()conf.setAppName("SparkRDDdemo1")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)/*** RDD的创建方式:* 1、通过textFile读取文件* 2、通过集合创建RDD(一般用于测试)*///textFile是读取文件的RDD形式,parallelize是创建一个list集合的方式/*** map是转换算子,是懒执行的,*需要接收一个函数f:参数为RDD中的泛型,返回值类型自定* 会将每一条数据一次传给函数f进行转换* 最终整个map方法完成后会返回一个新的RDD**/val listRDD: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9))/*** 在算子的外部是在driver端运行的,而在算子内部是在executor端执行的*///println("map之前")val mapRDD: RDD[Int] = listRDD.map(i => {println("i的值" + i)i * 20})//println("map之后")
//    mapRDD.foreach(println)
//    listRDD.foreach(println)val JiShuRDD: RDD[Int] = listRDD.filter(i => {var flag: Boolean = falseif (i % 2 == 1) {flag = true}flag})JiShuRDD.foreach(println)while (true){}}}

1.2 flatmap

package com.shujia.coreimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutable/*** @return :danzhu努力学bigdata*/
object SparkRDDFlatmap {def main(args: Array[String]): Unit = {/*** flatmap算子:转换算子** 需要接受一个函数f:参数类型同RDD中的泛型,返回值类型是集合、数组、序列、迭代器(数据容器)* 会将每一条数据依次传递给函数f进行转换,还会将函数f返回的数据容器进行扁平化处理(展开)* 得到一个新的RDD*///创建上下文环境val conf: SparkConf = new SparkConf()conf.setAppName("SparkRDDFlatmap")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val lineRDD: RDD[String] = sc.parallelize(List("java,java,scala,python","hadoop,hive,hbase","spark,filk,MapReduce"))val splitsRDD: RDD[String] = lineRDD.flatMap(word => {word.split(",")})val groupByRDD: RDD[(String, Iterable[String])] = splitsRDD.groupBy(word=>word)val wordcountRDD : RDD[(String, Int)] = groupByRDD.map(kv => {val key: String = kv._1val value: Iterable[String] = kv._2val size: Int = value.size(key, size)})wordcountRDD.foreach(println)groupByRDD.foreach(println)splitsRDD.foreach(println)lineRDD.foreach(println)}}

1.3 groupBy和groupBykey

package com.shujia.coreimport org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD/*** @return :danzhu努力学bigdata*/
object SparkRDDGroupBy {def main(args: Array[String]): Unit = {/*** groupBy:转换算子:需要制定按什么排名***///创建spark上下文的环境val conf: SparkConf = new SparkConf()conf.setAppName("SparkRDDGroupBy")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)//读取students数据val lineRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt")//统计班级人数val clazzRDD: RDD[(String, Int)] = lineRDD.map(line => (line.split(",")(4), 1))//clazzRDD.foreach(println)//按班级分组val groupRDD: RDD[(String, Iterable[(String, Int)])] = clazzRDD.groupBy(kv => kv._1)// groupRDD.foreach(println)//统计班级人数val sum_clazzRDD: RDD[(String, Int)] = groupRDD.map {case (key: String, iter: Iterable[(String, Int)]) => {val clazz_sum: Int = iter.map(lin => lin._2).sum(key, clazz_sum)}}//sum_clazzRDD.foreach(println)/*** groupBykey:转换算子** 分区类算子:分区类算子只能作用在k-v格式的RDD上** 在当前程序中,lineRDD不是K-V格式,所以没有groupBykey算子* groupBykey算子默认按照key进行分组结果同groupBy类似担忧细微的差异* 这两个group算子都会返回kv格式* k:指定的分组字段(groupBy)、k-v格式的RDD的key(groupBykey)* V:符合相同分组条件的一个整体(groupBy),只会返回value(groupBykey)*/val groupByKeyRDD: RDD[(String, Iterable[Int])] = clazzRDD.groupByKey()groupByKeyRDD.map(kv=>(kv._1,kv._2.sum)).foreach(println)}}

1.4 filter

package com.shujia.coreimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/**** @return :danzhu努力学bigdata*/
object SparkRDDFilter{def main(args: Array[String]): Unit = {/*** filter:转换算子*  需要接收一个函数f:餐胡类型同RDD中的泛型,返回值类型时Boolean类型*  会根据函数f的返回值对数据进行过滤*   如果返回true则保留数据,返回false则将数据过滤*///创建Spark上下环境val conf: SparkConf = new SparkConf()conf.setAppName("SparkRDDFilter")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)//读取students数据val lineRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt")//过滤除理科班学生lineRDD.filter(line=>{val splits: Array[String] = line.split(",")//startsWith是字符串中以某某为前缀的方法splits(4).startsWith("理科")}).foreach(println)}
}

1.5 Mappartitions

package com.shujia.coreimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** @return :danzhu努力学bigdata*/
object SparkRDDMappartitions {def main(args: Array[String]): Unit = {/*** Mappartitions:转换算子**///创建上下文的环境val conf: SparkConf = new SparkConf()conf.setAppName("SparkRDDMappartitions")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val lineRDD: RDD[String] = sc.textFile("spark/data/words")//对每一个分区的数据进行处理,这里有三份文件,既有三个分区,每一个分区至少对应一个task//适用于在算子内部需要跟外部数据源建立连接的情况//通过mapPartitions这种方式可以减少连接创建的次数,顺便提高运行效率/*** 使用迭代器是因为迭代器只能迭代一次就没有了(自动消失),而list可以迭代多次,* list在内存里,迭代器是要用的时候才会有,所以不会占用内存过多**/lineRDD.mapPartitions((iter: Iterator[String]) => {println("map partitions") //打印三次//迭代器也有map等方法iter.flatMap(line => {line.split(",")})}).foreach(println)//对每一条数据进行处理,假设有N条数据//如果需要在map中例如去请求mysql的数据(一般创建连接是为了获取数据),那么会与mysql建立N次连接//会导致运行效率较低,甚至会导致mysql建立的连接数达到上限,出现性能问题lineRDD.map(line => {println("map")val strings: Array[String] = line.split(",")strings}).foreach(println)lineRDD.mapPartitionsWithIndex((index,iter)=>{println("当前的分区索引:"+index)iter.flatMap(line=>line.split(",0"))}).foreach(println)}}

1.6 mapValues

package com.shujia.coreimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/**** @return :danzhu努力学bigdata*/
object SparkRDDMapvalues {def main(args: Array[String]): Unit = {//创建spark上下文的环境val conf: SparkConf = new SparkConf()conf.setAppName("SparkRDDGroupBy")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)//只能作用在k-v格式的RDD上,相当于对values进行遍历val rdd: RDD[(String, Int)] = sc.parallelize(List(("张三",1),("李四",2),("王五",3)))rdd.mapValues(i=>i*i).foreach(println)}}

1.7 sort

package com.shujia.coreimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/**** @return :danzhu努力学bigdata*/
object SparkRDDsort {def main(args: Array[String]): Unit = {//创建spark上下文的环境val conf: SparkConf = new SparkConf()conf.setAppName("SparkRDDGroupBy")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val stuRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt")//按照年龄排序,倒序//ascending 默认升序排序stuRDD.sortBy(stu=>stu.split(",")(2),ascending = false).foreach(println)}}

1.8 simple

package com.shujia.coreimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/**** @return :danzhu努力学bigdata*/
object SparkRDDSample {def main(args: Array[String]): Unit = {/*** sample:转换算子* withReplacement:有无放回* fraction:抽样比例(最终抽样出来的数据量大致等于抽样比例)**///创建spark上下文环境val conf: SparkConf = new SparkConf()conf.setAppName("SparkRDDSimple")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val lineRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt")val sampleRDD: RDD[String] = lineRDD.sample(false, 0.2)sampleRDD.foreach(println)}
}

1.9 union

package com.shujia.coreimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** @return :danzhu努力学bigdata*/
object SparkRDDUnion {def main(args: Array[String]): Unit = {//创建spark上下文的环境val conf: SparkConf = new SparkConf()conf.setAppName("SparkRDDGroupBy")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)//通过集合创建RDD/*** union:转换算子*///两个RDD union格式必须一致val rdd1: RDD[Int] = sc.parallelize(List(1,2,3,4,5,6))val rdd2: RDD[Int] = sc.parallelize(List(4,5,6,7,8,9))rdd1.union((rdd2)).foreach(println)}}

2、 Actions算子

2.1 count,collect,reduce,save,lookup

package com.shujia.coreimport org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD/**** @return :danzhu努力学bigdata*/
object SparkRDDAction {def main(args: Array[String]): Unit = {//创建spark上下文的环境val conf: SparkConf = new SparkConf()conf.setAppName("SparkRDDGroupBy")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)//读取students、scores数据val stuRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt")//foreach 没有返回值,会触发job//需要接收一个函数f:参数为RDD中的泛型,返回值类型为UnitstuRDD.foreach(println)/*** count:行为算子* 统计RDD中的数据* 注意:RDD中不保存数据,RDD只是spark的一种编程模型**/println(stuRDD.count())/*** collect:行为算子。将RDD 中的数转换为scala中的数组*//*** 在一个RDD中不能直接使用另一个RDD* 1、RDD是一个抽象的编程模型,没有实现序列化的* 2、如果在Task中使用另一个RDD,那么这个RDD的转换以及Action由谁进行调度和申请资源呢**/val stuArr: Array[String] = stuRDD.collect()val blackListRDD: RDD[String] = sc.parallelize(List("1500100001","1500100007","1500100009"))//我们可以在算子外部先调用collect方法然后再算子内部调用val ListRDD: Array[String] = blackListRDD.collect()stuRDD.filter(stu=>{ListRDD.contains(stu.split(",")(0))}).foreach(println)/*** reduce:行为算子**///传入一个聚合函数//select sum(age) from students group by 1//全局的聚合(将所有的数据作为一个组进行聚合)stuRDD.map(line=>line.split(",")(2)).reduce((i,j)=>i+j).foreach(println)/*** save:*/stuRDD.saveAsTextFile("")/*** lookup:作用在k-v格式的RDD上,传入一个key,返回与之对应的value**/val ids: Seq[String] = stuRDD.map(line => (line.split(",")(1), line.split(",")(0))).lookup("宣谷芹")println(ids)}
}

2.2 foreach 和 foreachPartition

package com.shujia.coreimport java.sql.{Connection, DriverManager, PreparedStatement}import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** @return :danzhu努力学bigdata*/
object SparkRDDForeach {def main(args: Array[String]): Unit = {/*** foreach、foreachPartition都是行为算子* foreach:需要接收一个函数f:参数类型同RDD中的泛型,返回值类型Unit* 会将每一条数据依次传递给函数f进行最终的一个处理,一般用于输出打印(测试)** foreachPartion:需要接收一个函数f:参数类型是iterator类型,返回值类型Unit*  会将每个分区的数据传给Trerator并进行最终的处理,一般用于将结果保存到外部系统(mysql)** 注意:一般算子后面跟着partition的算子,参数类型一般是迭代器(减少占用内存),*      这样的会迭代器还要调用其他普通算子一次*///创建上下文环境val conf: SparkConf = new SparkConf()conf.setAppName("SparkRDDForeach")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)//读取数据,设置了是个分区val lineRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt", 4)println(lineRDD.getNumPartitions)//创建mysql连接//遍历每一条数据//因为不需要返回值,所以选择foreach行为算子遍历/*** 算子外部的代码在Driver端执行的* 算子内部的代码是以Task的形式发送到Executor中执行的* 连接是不能被序列化的,所以连接的建立需要放入算子内部** //      *///    lineRDD.foreach(line=>{//      //连接是不能被序列化的,所以连接的建立需要放入算子内部//      //foreach是针对每一条数据处理一次,相当于这里会创建1000次连接,会造成性能问题//      val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/test?useUnicode=true&characterEncoding=UTF-8","root","123456")//      val ps: PreparedStatement = conn.prepareStatement("insert into student2 values(?,?,?,?,?)")//      val splits: Array[String] = line.split(",")//      val id: Int = splits(0).toInt//      val name: String = splits(1)//      val age: Int = splits(2).toInt//      val gender: String = splits(3)//      val clazz: String = splits(4)//      ps.setInt(1,id)//      ps.setString(2,name)//      ps.setInt(3,age)//      ps.setString(4,gender)//      ps.setString(5,clazz)//      ps.execute()//      ps.close()//      conn.close()//    })/*** 可以使用foreachpartition代替foreach完成对mysql数据的插入*   适用于在算子内部需要跟外部数据源建立连接(一般创建连接是为了写入数据)的情况** //连接是不能被序列化的,所以连接的建立需要放入算子内部* //foreach是针对每一条数据处理一次,相当于这里会创建1000次连接,会造成性能问题* //对每个分区的数据进行处理,相当于每个分区建立一次连接,因为有是个分区,所以只会创建四次连接* //大大降低连接的次数,提高性能****/lineRDD.foreachPartition(iter => {val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "123456")val ps: PreparedStatement = conn.prepareStatement("insert into student2 values(?,?,?,?,?)")//这里的foreach方法实际上不是RDD的算子,这里是Iterator的foreach方法//不会出现连接未被序列化的问题,当前处理的分区数据都会共用一个连接iter.foreach(line => {val splits: Array[String] = line.split(",")val id: Int = splits(0).toIntval name: String = splits(1)val age: Int = splits(2).toIntval gender: String = splits(3)val clazz: String = splits(4)ps.setInt(1, id)ps.setString(2, name)ps.setInt(3, age)ps.setString(4, gender)ps.setString(5, clazz)//相当于每条数据插入一次,性能也比较低//ps.execute()ps.addBatch()})//采用批量插入的方式ps.executeBatch()ps.close()conn.close()})}}

说明:代码中所涉及到的数据,可以联系本人获取

spark之RDD的转换算子与行为算子的具体使用相关推荐

  1. Spark的RDD转换算子

    目录 RDD转换算子 Value 类型 1. map 2. mapPartitions map 和mapPartitions 的区别 3. mapPartitionsWithIndex 4. flat ...

  2. Spark函数详解系列--RDD基本转换

    http://www.cnblogs.com/MOBIN/p/5373256.html 摘要: RDD:弹性分布式数据集,是一种特殊集合 ‚ 支持多种来源 ‚ 有容错机制 ‚ 可以被缓存 ‚ 支持并行 ...

  3. mappartitions java_Spark算子:RDD基本转换操作(5)–mapPartitions、mapPartitionsWithIndex

    关键字:Spark算子.Spark RDD基本转换.mapPartitions.mapPartitionsWithIndex mapPartitions def mapPartitions[U](f: ...

  4. spark中的转换算子和行动算子区别(transformations and actions)

    算子(RDD Operations): 对于初学者来说,算子的概念比较抽象,算子可以直译为 "RDD的操作", 我们把它理解为RDD的方法即可 . 转换算子(transformat ...

  5. Spark 常用算子详解(转换算子、行动算子、控制算子)

    Spark简介 Spark是专为大规模数据处理而设计的快速通用的计算引擎: Spark拥有Hadoop MapReduce所具有的优点,但是运行速度却比MapReduce有很大的提升,特别是在数据挖掘 ...

  6. 详解 Spark RDD 的转换操作与行动操作

    前言 本期继续讲解 Spark 核心 RDD 编程部分,内容比较干货也比较长,建议大家先收藏. 学习目标 RDD 的创建 RDD 的转换操作 RDD 的行动操作 惰性求值 1. RDD 的创建 Spa ...

  7. Spark:RDD编程总结(概述、算子、分区、共享变量)

    目录 1.RDD概述 1.1.RDD是什么 1.2.RDD的弹性 1.3.RDD的特点 1.3.1.分区 1.3.2.只读 1.3.3.依赖 1.3.4.缓存 1.3.5.检查点 2.RDD编程 2. ...

  8. spark的RDD中的action(执行)和transformation(转换)两种操作中常见函数介绍

    参考文章:spark的RDD中的action(执行)和transformation(转换)两种操作中常见函数介绍 spark常见的RDD 1. 函数概览 2. 常见的Transformations 操 ...

  9. Spark的RDD行动算子

    目录 基本概念 算子介绍 1. reduce 2. collect 3. count 4. first 5. take 6. takeOrdered 案例实操1-6 7. aggregate 8. f ...

最新文章

  1. 程序员到了35 岁就要被裁员?
  2. 沈向洋:从深度学习到深度理解
  3. Spark1.0.0 开发环境高速搭建
  4. mysql 将时间戳直接转换成日期时间,mysql查询某一天的数据。
  5. 漫漫MySQL之路(1.MySQL简介和诞生)
  6. java父类子类顺序_java父类子类內部程序的执行顺序
  7. orcale 之 集合操作
  8. ffmpeg-win32-v3.2.4 下载_MVBOX下载|MVBOX 7.1.0.4官方版
  9. java游戏可以刷升级挖药材,【毕业设计】Java手机游戏设计
  10. 查询mysql各个库和表的大小并按大小输出
  11. mysql join 索引 无效_ORACLE MYSQL中join 字段类型不同索引失效的情况-阿里云开发者社区...
  12. 查看HTML请求(request)中的标头(Headers)信息
  13. Flash 二进制传图片到后台Java服务器接收
  14. es6 for(var item of list)
  15. 求助:CISCO2811DHCP中继配置
  16. 马里兰大学本科计算机科学,2020年马里兰大学本科专业设置
  17. java使用httpclient简单模拟登陆微信公众开放平台
  18. FOC学习之路——硬件电路(一)
  19. android iOS App客户端如何实现在线支付
  20. [网络安全自学篇] 七十三.WannaCry勒索病毒复现及分析(四)蠕虫传播机制全网源码详细解读

热门文章

  1. 从新东方被裁转行互联网。月薪过万,入职百度外包的真实感受。
  2. ifix自定义声音报警(自动执行脚本)
  3. OpenFeign 如何做到 隔空取物 ?
  4. 推荐系统-task03-矩阵分解
  5. Android是虚拟机运行后总是显示:Unfortunately XXX has stopped.
  6. 哪个Linux ATA 硬盘,linux – scsi和/ dev / disk / by-id下相同硬盘的ata条目
  7. 昆明睿正科技有限公司-创业之初
  8. 打破冯·诺依曼结构,中国的类脑芯片已经来了!
  9. 学不止境,学然后知不足,教然后知困
  10. 《思维力—高效的系统思维》