文章目录

  • 一、Spark作业执行流程(重点)
  • 二、RDD编程
    • 2.1创建RDD的⼆种⽅式:
    • 2.2Transformation算⼦
    • 2.3Action算子
  • 三、简单算子(必须掌握)
    • 3.1 map、mapPartitions、mapPartitionsWithIndex
      • 3.1.1 ⽐较map flatMap mapPartitions mapPartitionsWithIndex
    • 3.2 flatMap
    • 3.3 glom
    • 3.4 mapValue
    • 3.5 filter
    • 3.6keyBy
    • 3.7 groupBy
    • 3.8 groupByKey
    • 3.9 reduceByKey
    • 3.10 foldByKey

一、Spark作业执行流程(重点)

两张图,都一样的内容


源码解析版

二、RDD编程

2.1创建RDD的⼆种⽅式:

1.从集合中创建RDD
只能是Seq类型的,如果不是Seq类型,如Set和Map,则需要使用toList转换成Seq类型
(1)mkRDD()方法
(2)parallize()方法
2.从外部存储创建RDD(常用)
(1)textFile()方法

注意:遍历RDD类型的变量只能使用内置的函数foreach,不能直接使用prinln(rdd),这是因为RDD变量中并不存储数据信息,因此直接打印出来的是空值

object _01_CreateRDD {// 必备,初始化 SparkContextval sc: SparkContext = new SparkContext(new SparkConf().setMaster("local").setAppName("RDD-Create"))def main(args: Array[String]): Unit = {_01_createFromCollection()}// 1.通过集合,创建RDDdef _01_createFromCollection(): Unit = {// 1. makeRDD: 底层实现是parallelizeval rdd1: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5))val rdd2: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5))// RDD中是不存储数据的println(rdd1)val map: Map[String, String] = Map("name" -> "xiaoming", "age" -> "10")val list: List[(String, String)] = map.toListval rdd3: RDD[(String, String)] = sc.parallelize(list)}// 2.通过文件,创建RDDdef _02_createFromFile(): Unit = {val rdd: RDD[String] = sc.textFile("src\\main\\scala\\day07_spark\\_01_RDD_Create\\_01_CreateRDD.scala")}
}

2.2Transformation算⼦

RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些
应⽤到基础数据集(例如⼀个⽂件)上的转换动作。只有当发⽣⼀个要求返回结果给Driver的动作时,
这些转换才会真正运⾏。这种设计让Spark更加有效率地运⾏

2.3Action算子

在RDD上运⾏计算,并返回结果给Driver或写⼊⽂件系统

三、简单算子(必须掌握)

准备工作: 初始化SparkContext

val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local").setAppName("RDD-Basic"))

3.1 map、mapPartitions、mapPartitionsWithIndex

 * 转换算子: map* 对数据进行的映射,将RDD中的每一个元素,都映射为参数函数返回的结果* 对每一个分区中的每一个数据,进行的映射* 转换算子: mapPartitions* 是map算子的变种, 也是对RDD中的数据进行的映射* 和map的区别:*      map算子是作用在每一个分区中的每一个元素上的, mapPartitions是作用在分区上的。*      mapPartitions会将一个分区,作为一个整体,来进行映射* 转换算子: mapPartitionsWithIndex* 类似于mapPartitions,也是对一个分区进行的整体的计算,但是比mapPartitions多出一个分区的下标
    /*** 转换算子: map* 对数据进行的映射,将RDD中的每一个元素,都映射为参数函数返回的结果* 对每一个分区中的每一个数据,进行的映射*/@Test def mapTest(): Unit = {// 存储有数据的集合val array: Array[String] = Array("dog", "cat", "elephent", "lion", "tiger", "monkey")// 通过集合,创建RDD,创建两个分区val rdd: RDD[String] = sc.parallelize(array, 2)// 做数据的映射(需求:将RDD中的每个元素,映射为自己的长度)val rdd1: RDD[Int] = rdd.map(_.length)      // _.length 等价于 str => str.length// 输出rdd1中的数据rdd1.foreach(println)val rdd2: RDD[(String, Int)] = rdd.map(x => (x, x.length))rdd2.foreach(println)}/*** 转换算子: mapPartitions* 是map算子的变种, 也是对RDD中的数据进行的映射* 和map的区别:*      map算子是作用在每一个分区中的每一个元素上的, mapPartitions是作用在分区上的。*      mapPartitions会将一个分区,作为一个整体,来进行映射*/@Test def mapPartitionsTest(): Unit = {// 1. 准备数据val sourceRDD: RDD[String] = sc.parallelize(Array("宋江", "卢俊义", "吴用", "公孙胜", "关胜"), 2)// 2. 元素映射   Iterator[String] => Iterator(String, Int)val rdd1: RDD[(String, Int)] = sourceRDD.mapPartitions(ite => ite.map(x => (x, x.length)))rdd1.foreach(println)}/*** 转换算子: mapPartitionsWithIndex* 类似于mapPartitions,也是对一个分区进行的整体的计算,但是比mapPartitions多出一个分区的下标*/@Test def mapPartitionsWithIndexTest(): Unit = {// 1. 准备数据val sourceRDD: RDD[String] = sc.parallelize(Array("金莲小姐姐", "婆汐小姐姐", "三娘小姐姐", "师师小姐姐"),3)// 2. 需求: 将所有的元素,映射为分区的序号加上原来的名字再加上长度  如:0,金莲小姐姐,5val rdd1: RDD[(Int, String, Int)] = sourceRDD.mapPartitionsWithIndex((index, iterator) => iterator.map(str => (index, str, str.length)))rdd1.foreach(println)}

map结果:

mapPartitons结果:

mapPartitionsWithIndex结果:

3.1.1 ⽐较map flatMap mapPartitions mapPartitionsWithIndex

Spark中,最基本的原则,就是每个task处理⼀个RDD的partition。

MapPartitions操作的优点:
如果是普通的map,⽐如⼀个partition中有1万条数据;ok,那么你的function要执⾏和计算1万次。
但是,使⽤MapPartitions操作之后,⼀个task仅仅会执⾏⼀次function,function⼀次接收所有的partition数据。只要执⾏⼀次就可以了,性能⽐较⾼。

MapPartitions的缺点:可能会OOM。(out of memory)

如果是普通的map操作,⼀次function的执⾏就处理⼀条数据;那么如果内存不够⽤的情况下,⽐如处理了1千条数据了,那么这个时候内存不够了,那么就可以将已经处理完的1千条数据从内存⾥⾯垃圾回收掉,或者⽤其他⽅法,腾出空间来吧。
mapPartition():每次处理⼀个分区的数据,这个分区的数据处理完后,原RDD中分区的数据才能释放,可能导致OOM。所以说普通的map操作通常不会导致内存的OOM异常。

在项⽬中,⾃⼰先去估算⼀下RDD的数据量,以及每个partition的量,还有⾃⼰分配给每个executor
的内存资源。看看⼀下⼦内存容纳所有的partition数据,⾏不⾏。如果⾏,可以试⼀下,能跑通就好。
性能肯定是有提升的。

3.2 flatMap

 * 转换算子: flatMap* 扁平化映射* 扁平化映射是将可迭代变量一个一个取出来,通过相应的函数,然后去除可迭代性质* 针对RDD中的数据map之后的结果是集合的情况,将集合中的元素提取到一个RDD中
    @Test def flatMapTest(): Unit = {// 1. 准备数据val sourceRDD: RDD[String] = sc.parallelize(Array("诸葛亮  周瑜", "鲁肃   陆逊", "郭嘉   荀彧"))// 2. 扁平化映射val res0: RDD[String] = sourceRDD.flatMap(_.split(" +"))res0.foreach(println)val sourceRDD2: RDD[Array[String]] = sc.parallelize(Array(Array("诸葛亮", "周瑜"), Array("鲁肃", "陆逊"), Array("郭嘉", "荀彧")))val res1: RDD[String] = sourceRDD2.flatMap(_.iterator)val sourceRDD3: RDD[Array[String]] = sc.parallelize(Array(Array("诸葛亮  黄月英", "周瑜  小乔"), Array("孙策  大乔", "吕布  貂蝉"), Array("刘备  孙尚香  甘夫人  糜夫人")))// val res2: RDD[String] = sourceRDD3.flatMap(array => array.flatMap(_.split(" +")))val res2: RDD[String] = sourceRDD3.flatMap(_.flatMap(_.split(" +")))res2.foreach(println)}

3.3 glom

 * 转换算子: glom* 可以将每一个分区中的所有的元素,聚合为一个数组,放回之前的分区中* 这个方法,不会改变分区的数量
    @Test def glomTest(): Unit = {// 将1-20的数据,分到4个分区中val sourceRDD: RDD[Int] = sc.parallelize(1 to 20, 4)// 将每一个分区的所有的数据,聚合到一个数组中val res: RDD[Array[Int]] = sourceRDD.glom()res.foreach(arr => println(arr.mkString(", ")))println(res.getNumPartitions)}

3.4 mapValue

 * 转换算子: mapValues* 只针对于PariedRDD(成对的RDD),也就是说KV型的RDD* 其实这里操作的数据,是以元组的形式存在的** 对键值对的值进行的映射,不对键做任何的处理* 注意,这里不能是Map类型的,RDD没有Map类型* 如果是Map类型,则需要使用toList方法将Map转换成RDD[(U,T)]类型
    @Test def mapValuesTest(): Unit = {// val sourceRDD: RDD[String] = sc.parallelize(Array("贾宝玉", "林黛玉", "薛宝钗", "袭人", "探春"))// val rdd1: RDD[(String, Int)] = sourceRDD.map(x => (x, x.length))// val res0: RDD[(String, Int)] = rdd1.mapValues(_ + 10)// res0.foreach(println)val names: Array[String] = Array("贾宝玉", "林黛玉", "薛宝钗", "袭人", "探春")val ages: Array[Int] = Array(20, 18, 21, 19, 17)val pairs: Array[(String, Int)] = names.zip(ages)val sourceRDD: RDD[(String, Int)] = sc.parallelize(pairs)val res1: RDD[(String, Int)] = sourceRDD.mapValues(_ + 20)res1.foreach(println)}

3.5 filter

 * 转换算子: filter* 对RDD中的元素进行过滤,保留满足条件的元素
    @Test def filterTest(): Unit = {val names: Array[String] = Array("贾宝玉", "林黛玉", "薛宝钗", "袭人", "探春")val ages: Array[Int] = Array(20, 18, 21, 19, 17)val pairs: Array[(String, Int)] = names.zip(ages)val sourceRDD: RDD[(String, Int)] = sc.parallelize(pairs)// 保留所有的大于等于20岁的数据val res: RDD[(String, Int)] = sourceRDD.filter(_._2 >= 20)res.foreach(println)//过滤出贾宝玉的name和ageval rdd:RDD[(String,Int)] = sourceRDD.filter(_._1.equals("贾宝玉"))rdd.foreach(value => println(s"name = ${value._1},age = ${value._2}"))}

3.6keyBy

 * 转换算子: keyBy* 将RDD中的元素,按照指定的部分作为键,整体的数据作为值,
    @Test def keyByTest(): Unit = {val names: Array[String] = Array("贾宝玉", "林黛玉", "薛宝钗", "袭人", "探春")val rdd: RDD[String] = sc.parallelize(names)// 需求: 计算每一个人的名字的长度,作为键,以名字本身作为值val rdd1: RDD[(Int, String)] = rdd.keyBy(_.length)rdd1.foreach(println)}

3.7 groupBy

 * 转换算子: groupBy* 对RDD中的元素,按照指定的部分,进行分组
    @Test def groupByTest(): Unit = {val rdd: RDD[String] = sc.parallelize(List("孙悟空", "猪悟能", "金角大王", "银角大王",  "沙悟净", "小白龙","铁扇公主", "玉帝", "昊天金阙无上至尊自然妙有弥罗至真玉皇上帝"), 3)// 需求: 按照字符串的长度,对数据进行分组// 将相同长度的字符串,放到一个分组中val res0: RDD[(Int, Iterable[String])] = rdd.groupBy(_.length)res0.foreach(kv => println(s"key = ${kv._1} value = ${kv._2.mkString(",")}"))}

3.8 groupByKey

 * 转换算子: groupByKey* 针对于PairedRDD,按照键进行分组,相同的键的值,放到一个集合中
    @Test def groupByKeyTest(): Unit = {val rdd: RDD[(String, Int)] = sc.parallelize(Array(("Tom", 12), ("Jerry", 11), ("Tom", 10), ("Jerry", 9), ("dawa", 13), ("shejing", 14)))val res: RDD[(String, Iterable[Int])] = rdd.groupByKey()res.foreach(kv => println(s"key = ${kv._1} value = ${kv._2.mkString(",")}"))}

3.9 reduceByKey

 * 转换算子: reduceByKey* 针对于 RDD[(Key, Value)], 会自动的将相同的键作为一个分组, 将函数的计算逻辑,作用在所有的value身上,对value进行运算
    @Test def reduceByKeyTest(): Unit = {val source: RDD[String] = sc.parallelize(Array("Tom   Jerry     Kate    Tom     Jerry    dawa   erwa   dawa   shejing   shejing  shejing   Tom   Snoopy"))// 需求: 算wordCountval value: RDD[(String, Int)] = source.flatMap(_.split(" +")).map((_, 1))// val res0: RDD[(String, Int)] = value.groupByKey().mapValues(_.size)// val res0:RDD[(String,Int)] = rdd2.reduceByKey((v1,v2) => v1+v2)val res0: RDD[(String, Int)] = value.reduceByKey(_ + _)res0.foreach(println)}

3.10 foldByKey

 * 转换算子: foldByKey* 和reduceByKey差不多,也是针对于KV形式的RDD的操作,每次在进行聚合运算的时候,都会添加上一个默认值
    @Test def foldByKeyTest(): Unit = {// 准备数据源val rdd: RDD[String] = sc.parallelize(Array("诛仙", "光之子", "神墓", "长生界", "斗破苍穹", "武动乾坤", "大主宰", "星辰变", "盘龙", "九鼎", "寸芒"), 2)// 按照长度分组val rdd2: RDD[(Int, String)] = rdd.keyBy(_.length)// val res1: RDD[(Int, String)] = rdd2.reduceByKey(_ + _)// res1.foreach(println)val res1: RDD[(Int, String)] = rdd2.foldByKey("书名: ")(_ + _)res1.foreach(println)}

【大数据开发】SparkCore——Spark作业执行流程、RDD编程的两种方式、简单算子相关推荐

  1. IOS开发基础之团购案例17-xib和UITableView两种方式实现

    IOS开发基础之团购案例17-xib和UITableView两种方式实现 Design By Johnson Shanghai 实现效果 系统和Xcode版本 注意的细节 关键性的代码 // // V ...

  2. spark中自定义UDAF函数实现的两种方式---UserDefinedAggregateFunction和Aggregator

    1.基于UserDefinedAggregateFunction实现平均数的计算 package com.bigdata.wb.sparkimport org.apache.spark.sql.Row ...

  3. 大数据开发认知--spark

    1. Spark rdd生成过程 · Spark的任务调度分为四步 1RDD objects RDD的准备阶段,组织RDD及RDD的依赖关系生成大概的RDD的DAG图,DAG图是有向环图. 2DAG ...

  4. 大数据开发复习Spark篇

    11.spark 11.1.spark介绍 Apache Spark是用于大规模数据处理的统一分析计算引擎 Spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性 ...

  5. 大数据开发:Spark入门详解

    众所周知,Spark 它是专门为大规模数据处理而设计的快速通用计算引擎,因此Spark它在数据的挖掘等领域便有着非常广泛的应用,而从现阶段来讲的话它也已经形成了一个高速发展并且应用相当广泛的生态系统了 ...

  6. Spark 基础 —— RDD(创建 RDD)的两种方式

    弹性分布式数据集(Resilient Distributed Dataset),简称 RDD,RDD 是 Spark 所提供的最基本的数据抽象,代表分布在集群中多台机器上的对象集合.Spark 有两种 ...

  7. spark sql hbase java_Spark 读写 HBase 的两种方式(RDD、DataFrame)

    使用 saveAsHadoopDataset 写入数据 import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, Ta ...

  8. springboot项目启动成功后执行一段代码的两种方式

    实现ApplicationRunner接口 package com.lnjecit.lifecycle;import org.springframework.boot.ApplicationArgum ...

  9. spark代码连接hive_Spark SQL入门到实战之(7)spark连接hive(spark-shell和eclipse两种方式)...

    1.在服务器(虚拟机)spark-shell连接hive 1.1 将hive-site.xml拷贝到spark/conf里 cp /opt/apache-hive-2.3.2-bin/conf/hiv ...

最新文章

  1. 基于 OpenCV 的网络实时视频流传输
  2. FPGA之道(37)Verilog中的编写注意事项
  3. 关于prefrenceactivity和preferencefragment的作用
  4. Delphi 与 DirectX 之 DelphiX(46): TDIB.DoAntiAlias;
  5. 前端开发:小程序--第一步
  6. TCP浅谈为什么3次握手
  7. 数据库 数据库编程二
  8. 显示低帧率排查思路记录
  9. 如何使用阿里云矢量图标库
  10. Access入门简单教程
  11. 求 HCDA认证题库
  12. canvas绘制网易云logo
  13. nodejs php 模板,玩转nodejs
  14. 2018 mysql 笔试题_2018秋招数据库笔试面试题汇总
  15. java基础:面向对象编程23-this课后练习boygirl
  16. 《云计算和大数据时代网络技术揭秘》读后感
  17. release版本和debug版本
  18. C语言实现 百钱百鸡
  19. 智慧司法微信小程序项目
  20. sfc流程图怎么画_SFC顺序功能图教程

热门文章

  1. python中的main函数可以被其他文件调用么_在Python中,如何在另一个py文件的[if\u name\uuuu='\uu main\uu']中调用子例程?...
  2. 牛客-紫魔法师(仙人掌染色-判奇环)
  3. 刷脸支付潮酷在年轻化的场景中颇受欢迎
  4. 《从0到1》读书摘要
  5. 分享5个插件,助你在Python的道路越战越勇
  6. Scala 编程题四 继承
  7. 明伟LRS-350-24方案,送BOM表,原理图、PCB
  8. 基础语法篇_11——坐标空间和转换,图形的保存和重绘,元文件
  9. C++ Primer Plus (第六版)编程练习记录(chapter14 C++中的代码重用)
  10. 影石创新IPO被暂缓审议,科创板上市委质疑其可能存在董事会僵局