Spark部分算子及使用

案例一:flatmap算子

def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("test1").setMaster("local[1]")val sc = new SparkContext(conf)//UDAF 一对多 炸裂val list: List[String] = List("wuyanzu,pengyuyan,liushishi,zhangjunning")val listRDD: RDD[String] = sc.parallelize(list)//扁平化算子val flatMapRDD: RDD[String] = listRDD.flatMap(line => line.split(","))flatMapRDD.foreach(println)
}

案例二:map算子

def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("test1").setMaster("local[1]")val sc = new SparkContext(conf)
//一对一 val list:Seq[Int] = 1 to 7val listRDD: RDD[Int] = sc.parallelize(list)//每个元素*7val mapRDD: RDD[Int] = listRDD.map(e => e * 7)mapRDD.foreach(println)}

案例三:filter算子

对父RDD中每一个元素数据执行对应的function函数,如果函数返回true保留当前父RDD中的元素数据,如果为false,过滤掉当前数据

def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("test1").setMaster("local[1]")val sc = new SparkContext(conf)var list:List[String] = List("1,jiajingwen,18,0","2,pengyuyan,20,1","3,gaoyuanyuan,28,0")val listRDD: RDD[String] = sc.parallelize(list)//选出女生val filterRDD = listRDD.filter(e => {val strList: Array[String] = e.split(",")strList(3).equals("0")})filterRDD.foreach(println)
}

案例四:sample算子(从父RDD抽取一定量的数据)

数据倾斜调研的时候会用到sample,比如有10TB的数据统计key的个数,会很耗时,可以用抽样,随机抽样用部分的数据代表整体
reducebykey【相同的key拉取到同一节点,不同的key可能拉取到不同节点】

sample(false, 0.2, 1)

1.withReplacement=true:有放回抽样(withReplacement=false无放回抽样)

2.fraction=0.2:不一定就抽取20%,会在20%这个值上下浮动

3.seed=1:随机种子(固定随机模式)

def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("test1").setMaster("local[1]")val sc = new SparkContext(conf)val list:Seq[Int]= 1 to 10000val listRDD: RDD[Int] = sc.parallelize(list)val sampleRDD: RDD[Int] = listRDD.sample(false, 0.2, 1)sampleRDD.foreach(println)sc.stop()
}

案例五:union(数据合并)

若stage执行时间特别长:

1.检查是否存在shuffle型算子,可能存在数据倾斜

整体数据进行拆分:一部分key没有倾斜【代码逻辑处理】,一部分key倾斜【代码逻辑处理】,最终结果进行union=》union

2.打开网页查看当前stage没有shuffle算子,则可能你的资源太少,数据量太大。
分配更多的资源(内存、cpu)

def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("test1").setMaster("local[1]")val sc = new SparkContext(conf)val listRDD1: RDD[Int] = sc.parallelize(1 to 5 )val listRDD2: RDD[Int] = sc.parallelize(3 to 10 )val unionRDD: RDD[Int] = listRDD1.union(listRDD2)unionRDD.foreach(println)sc.stop()
}

案例六:join(表连接:内连接、外连接(左外右外全外)、笛卡尔积)

【innerjoin】

def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("test1").setMaster("local[1]")val sc = new SparkContext(conf)val stu: List[String] = List("0 欧豪 20 bj", "1 沙溢 22 bj","2 海清 35 bj","3 闫妮 30 sz","4 萧亚轩 18 tb")val score: List[String] = List("1 math 10","2 math 20","3 math 30","4 math 22","5 english 30")//如果想使用join,RDD泛型应该是tuple2,RDD[(key,value)]val stuRDD: RDD[String] = sc.parallelize(stu)val scoreRDD: RDD[String] = sc.parallelize(score)val stuMapRDD: RDD[(String, String)] = stuRDD.map(stuLine => {val sid = stuLine.substring(0, 1)val stuInfo = stuLine.substring(1).trim(sid, stuInfo)})val scoreMapRDD: RDD[(String, String)] = scoreRDD.map(scoreLine => {val fields: Array[String] = scoreLine.split(" ")val sid = fields(0)val scoreInfo = fields(1) + " " + fields(2)(sid, scoreInfo)})val joinRDD: RDD[(String, (String, String))] = stuMapRDD.join(scoreMapRDD)//joinRDD.foreach(println)joinRDD.foreach(t=>{println(s"sid:${t._1}   stuInfo:${t._2._1}   scoreInfo:${t._2._2}")})//注意用大括号
//    joinRDD.foreach{//      case (sid,(stuInfo,scoreInfo))=>
//        println(sid+" "+stuInfo+" "+scoreInfo)
//    }sc.stop()
}

【左外、右外连接】

val leftOuterJoinRDD: RDD[(String, (String, Option[String]))] = stuMapRDD.leftOuterJoin(scoreMapRDD)
//右外连接,左表的数据可能为空,所以用了option
val rightOuterJoinRDD: RDD[(String, (Option[String], String))] = stuMapRDD.rightOuterJoin(scoreMapRDD)
rightOuterJoinRDD.foreach(println)sc.stop()

左外结果:

(4,(萧亚轩 18 tb,Some(math 22)))
(0,(欧豪 20 bj,None))
(2,(海清 35 bj,Some(math 20)))
(3,(闫妮 30 sz,Some(math 30)))
(1,(沙溢 22 bj,Some(math 10)))

右外结果:

(4,(Some(萧亚轩 18 tb),math 22))
(5,(None,english 30))
(2,(Some(海清 35 bj),math 20))
(3,(Some(闫妮 30 sz),math 30))
(1,(Some(沙溢 22 bj),math 10))

全外连接:

val fullOuterJoin: RDD[(String, (Option[String], Option[String]))] = stuMapRDD.fullOuterJoin(scoreMapRDD)fullOuterJoin.foreach{case (sid,(stuOption,scoreOption))=>{println(s"sid=${sid}"+s"stuInfo=${stuOption.getOrElse("-")} "+s"scoreInfo=${scoreOption.getOrElse("-")}")}}sc.stop()

案例七:交集、笛卡尔积

def main(args: Array[String]): Unit = {val conf =new SparkConf().setAppName("test1").setMaster("local[1]")val sc = new SparkContext(conf)val left: RDD[Int] = sc.parallelize(List(1, 2, 3))val right: RDD[Int] = sc.parallelize(List(3, 4, 5, 6))//val intersectionRDD: RDD[Int] = left.intersection(right)//交集//intersectionRDD.foreach(println)val cartesianRDD: RDD[(Int, Int)] = left.cartesian(right)//笛卡尔积cartesianRDD.foreach(println)
}

案例八:groupbykey【父RDD的泛型一般是Tuple】

def main(args: Array[String]): Unit = {val conf =new SparkConf().setAppName("test1").setMaster("local[1]")val sc = new SparkContext(conf)var list = List("1,刘诗诗,22,bj","1,吴奇隆,45,bj","2,于晓光,30,hz","2,秋瓷炫,31,hz")val listRDD: RDD[String] = sc.parallelize(list)val mapRDD: RDD[(String,String)] =listRDD.map(userInfo=>{val sid: String = userInfo.split(",")(0)(sid,userInfo)})val groupByKeyRDD: RDD[(String, Iterable[String])] = mapRDD.groupByKey()groupByKeyRDD.foreach{case (sid,infos)=>{println(s"sid=${sid}------>infos=${infos}")}}sc.stop()

案例九:reducebykey

对比:
groupbykey是把key相同的数据拉取到同一组;reducebykey是把key相同的数据拉取到同一组,对外提供了对同组数据操作的接口(可以让我们写程序操作组内的数据)

def main(args: Array[String]): Unit = {val conf =new SparkConf().setAppName("test1").setMaster("local[1]")val sc = new SparkContext(conf)var list = List("sun hong lei","sun xiang yang")val listRDD: RDD[String] = sc.parallelize(list)val flatMapRDD: RDD[String] = listRDD.flatMap(line => line.split(","))val mapRDD: RDD[(String, Int)] = flatMapRDD.map(word => (word, 1))val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey((v1, v2) => v1 + v2)sc.stop()
}

案例十:Distinct

def f1: Unit ={val conf =new SparkConf().setAppName("test1").setMaster("local[1]")val sc = new SparkContext(conf)var list: List[String] = List("海清","小沈阳","赵四","赵四")val listRDD: RDD[String] = sc.parallelize(list)listRDD.distinct().foreach(println)sc.stop()
}

案例十一:调优算子(distinct、group by)

  * 柯里化* def aggregateByKey[U: ClassTag]*     (zeroValue: U)  // 初始值  shuffle写这部分起作用*      (seqOp: (U, V) => U,  shuffle写这部分起作用  ,局部聚合*       combOp: (U, U) => U) shuffle读这部分起作用  ,全局聚合*      : RDD[(K, U)]*       = self.withScope {* aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)* }*   aggregateByKey 算子要比 reducbykey灵活度高*       发现:局部聚合的业务逻辑和全局聚合业务逻辑一模一样(优先 : reducbykey)*       发现: 局部聚合的业务逻辑和全局聚合业务逻辑不一样的时候 (考虑:aggregateByKey | combineByKey)
def f4: Unit ={val conf =new SparkConf().setAppName("test1").setMaster("local[1]")val sc = new SparkContext(conf)var list: List[String] = List("hello hello hello","world world world")val listRDD: RDD[String] = sc.parallelize(list)val flatMapRDD: RDD[String] = listRDD.flatMap(e => e.split(" "))val wordMapRDD: RDD[(String, Int)] = flatMapRDD.map(t => (t, 1))wordMapRDD.aggregateByKey(0)(_+_,_+_).foreach(println)
//    (hello,3)
//    (world,3)sc.stop()}
def f4: Unit ={val conf =new SparkConf().setAppName("test1").setMaster("local[1]")val sc = new SparkContext(conf)var list: List[String] = List("hello hello hello","world world world")val listRDD: RDD[String] = sc.parallelize(list)val flatMapRDD: RDD[String] = listRDD.flatMap(e => e.split(" "))val wordMapRDD: RDD[(String, Int)] = flatMapRDD.map(t => (t, 1))wordMapRDD.combineByKey((num:Int) => num,(sum1:Int,num2:Int)=>sum1+num2//shuffle写的时候起作用(局部聚合),(sum1:Int,sum2:Int)=>sum1+sum2//shuffle读的时候起作用(全局聚合)).foreach(println)sc.stop()
}

总结 : 2.2.2版本 groupbykey 和 reducebykey 底层都是combineByKeyWithClassTag。(—> combineByKey 前面的简略版本 )

案例十二 : Action算子

foreach(迭代)、saveasTextFile(持久化数据)、 count(统计RDD中有多少数据)
collect(拉取数据):慎用,除非数据量比较小、take(取前几个元素、默认没有排序)、takeordered(取前几个元素,有默认排序,首个元素排序)、saveasobjectfile(把对象写到文件)、saveastextfile(把处理好的结果【字符串】写到文本)、countbbykey(按照key分组)、takeordered(主要用于排序,可以指定任何字段排序)。

def f3: Unit ={val conf =new SparkConf().setAppName("test1").setMaster("local[1]")val sc = new SparkContext(conf)var list: List[String] = List("海清","小沈阳","赵四","赵四")val listRDD: RDD[String] = sc.parallelize(list)val result: Array[String] = listRDD.collect()println(result.mkString("|")) //海清|小沈阳|赵四|赵四sc.stop()
}
def f4: Unit ={val conf =new SparkConf().setAppName("test1").setMaster("local[1]")val sc = new SparkContext(conf)var list: List[String] = List("海清","小沈阳","赵四","赵四")val listRDD: RDD[String] = sc.parallelize(list)listRDD.saveAsTextFile("C:\\Users\\Administrator\\Desktop\\javaclass\\spark\\09_04_spark\\new")// val counts: Long = listRDD.count()  //计数
//    println(counts)sc.stop()
}
def f1: Unit ={val conf =new SparkConf().setAppName("test1").setMaster("local[1]")val sc = new SparkContext(conf)val fileRDD: RDD[String]= sc.textFile("C:\\Users\\Administrator\\Desktop\\javaclass\\spark\\09_03_spark_RDD\\ww.txt")val mapRDD: RDD[(String, Int)] = fileRDD.flatMap(_.split(",")).map((_, 1))val resultMap: collection.Map[String, Long] = mapRDD.countByKey()resultMap.foreach(println)//reduceByKey ---> RDD ---> 使用其它算子做操作//countByKey ---> Map ---> 后面不能使用其它算子做分布式数据分析sc.stop()
}

Spark部分算子及使用相关推荐

  1. Spark学习之Spark RDD算子

    个人主页zicesun.com 这里,从源码的角度总结一下Spark RDD算子的用法. 单值型Transformation算子 map /*** Return a new RDD by applyi ...

  2. Spark action算子案例

    在上篇文章中,我们对Spark中几种常用的transformation算子通过Java和Scala两种代码分别进行了案例演示,Spark transformation算子案例  而在本文中,我们将继续 ...

  3. spark算子大全glom_(七)Spark Streaming 算子梳理 — repartition算子

    目录 天小天:(一)Spark Streaming 算子梳理 - 简单介绍streaming运行逻辑 天小天:(二)Spark Streaming 算子梳理 - flatMap和mapPartitio ...

  4. Spark RDD算子(八)mapPartitions, mapPartitionsWithIndex

    Spark RDD算子(八) mapPartitions scala版本 java版本 mapPartitionsWithIndex scala版本 java版本 mapPartitions mapP ...

  5. Spark _30_SparkStreaming算子操作Driver HA

    SparkStreaming算子操作 foreachRDD output operation算子,必须对抽取出来的RDD执行action类算子,代码才能执行. import org.apache.sp ...

  6. Spark transformation算子案例

    Spark支持两种RDD操作:transformation和action  在本文中,将对几个常用的transformation算子进行案例演示,采用Java和Scala两种语言对代码进行编写  其中 ...

  7. Spark RDD算子介绍

    Spark学习笔记总结 01. Spark基础 1. 介绍 Spark可以用于批处理.交互式查询(Spark SQL).实时流处理(Spark Streaming).机器学习(Spark MLlib) ...

  8. Spark常用算子讲解一

    map map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD,RDD之间的元素是一对一关系. >>> x = sc.parallelize([1,2,3,4]) &g ...

  9. spark RDD算子大全

    目录 map()&&flatMap() map()&&mapPartitions() mapPartitionsWithIndex() filter() take()& ...

最新文章

  1. http提交json格式数据自动加\
  2. ADO.NET入门教程(三) 连接字符串,你小觑了吗?
  3. Jenkins命令可视化
  4. 嵌入式软件常见笔试面试题总结 .
  5. Java生鲜电商平台-SpringCloud微服务架构中分布式事务解决方案
  6. OCP-052考试题库汇总(60)-CUUG内部解答版
  7. Java自动生成增量补丁自动部署_java-Hibernate正在为表生成自动增量交替ID
  8. 不礼让行人怎么抓拍的_榆林机动车斑马线不礼让行人,您被曝光啦
  9. -字符串-搜索和替换--聚合
  10. ASP.NET SignalR 与LayIM配合,轻松实现网站客服聊天室(四) 添加表情、群聊功能...
  11. Android启动过程概述
  12. LAMP兄弟连网络基础视频地址全集!!!
  13. wordpress主题实现彩色标签云效果
  14. mysql workbench修改密码_更改MySQL用户密码
  15. Windows安装AdelaiDet的血与泪
  16. 愿天下有情人都是失散多年的兄妹(25 分)
  17. Win7问题汇总及解答!
  18. apache添加php语言模块,在apache中添加php处理模块-Go语言中文社区
  19. 手机拨打电话显示不在服务器区,疑问丨打电话时为什么提示手机不在服务区?...
  20. 用python turtle画小黄人源码_Python turtle模块小黄人程序

热门文章

  1. 罗丹明PEG羟基,RB-PEG-OH,Rhodamine-PEG-OH
  2. [游戏分析] 游戏逆向
  3. C 小树快长高 SDUT
  4. 浅析MVC、MVP、MVVM 架构
  5. 我希望是一朵花 BY 米拉贾伊【印】
  6. 指针和多重指针的一些理解( 谭浩强的恶梦....... )
  7. html 行高是什么单位,line-height什么意思?line-height带单位与不带单位的区别
  8. 兆讯传媒深交所上市破发:下跌近15% 收盘市值为68亿
  9. MySQL数据库——MySQL字符集和校对规则详解
  10. 【计算机考研择校分析】重庆邮电大学2022考情分析