1、Spark Java-Scala 混编Maven开发

(1)IDEA创建Maven 项目

  • 创建项目

  • 配置名称,点击下一步配置Maven及本地Maven仓库地址。
  • 配置项目名称和位置,并创建。
  • 更新替换Maven pom.xml文件,注意groupId,artifactId,version不要更新替换。
  • pom.xml见

https://blog.csdn.net/qq_41946557/article/details/102639605

  • 在main 目录下创建目录。名称任意。

  • 将main下的java和scala指定为源目录:

2、SparkCore

  • RDD
  • 概念

RDD(Resilient Distributed Dateset),弹性分布式数据集。

  • RDD的五大特性:
* Internally, each RDD is characterized by five main properties:
*
*  - A list of partitions
*  - A function for computing each split
*  - A list of dependencies on other RDDs
*  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
*  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
*    an HDFS file)
  1. RDD是由一系列的partition组成的。
  2. 函数是作用在每一个partition(split)上的。
  3. RDD之间有一系列的依赖关系。
  4. 分区器是作用在K,V格式的RDD上。
  5. RDD提供一系列最佳的计算位置。
  • RDD完美理解图:

  • 注意:
  1. textFile方法底层封装的是读取MR读取文件的方式,读取文件之前先split,默认split大小是一个block大小。
  2. RDD实际上不存储数据,这里方便理解,暂时理解为存储数据。
  3. 什么是K,V格式的RDD?
  • 如果RDD里面存储的数据都是二元组对象,那么这个RDD我们就叫做K,V格式的RDD。
  1. 哪里体现RDD的弹性(容错)?
  • partition数量,大小没有限制,体现了RDD的弹性。
  • RDD之间依赖关系,可以基于上一个RDD重新计算出RDD。
  1. 哪里体现RDD的分布式?
  • RDD是由Partition组成,partition是分布在不同节点上的。
  • RDD提供计算最佳位置,体现了数据本地化。体现了大数据中“计算移动数据不移动”的理念。

完美总结图:::

  • Spark任务执行原理

以上图中有四个机器节点,Driver和Worker是启动在节点上的进程,运行在JVM中的进程。

  • Driver与集群节点之间有频繁的通信。
  • Driver负责任务(tasks)的分发和结果的回收。任务的调度。如果task的计算结果非常大就不要回收了。会造成oom。
  • Worker是Standalone资源调度框架里面资源管理的从节点。也是JVM进程。
  • Master是Standalone资源调度框架里面资源管理的主节点。也是JVM进程。

Spark代码流程

  • 创建SparkConf对象
  1. 可以设置Application name。
  2. 可以设置运行模式及资源需求。
  • 创建SparkContext对象
  • 基于Spark的上下文创建一个RDD,对RDD进行处理。
  • 应用程序中要有Action类算子来触发Transformation类算子执行。
  • 关闭Spark上下文对象SparkContext。

【注,后面有代码实例】


Transformations转换算子

  • 概念:

Transformations类算子是一类算子(函数)叫做转换算子,如map,flatMap,reduceByKey等。Transformations算子是延迟执行,也叫懒加载执行。

  • Transformation类算子:
filter
过滤符合条件的记录数,true保留,false过滤掉。map
将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。
特点:输入一条,输出一条数据。flatMap
先map后flat。与map类似,每个输入项可以映射为0到多个输出项。sample
随机抽样算子,根据传进去的小数按比例进行又放回或者无放回的抽样。reduceByKey
将相同的Key根据相应的逻辑进行处理。
sortByKey/sortBy
作用在K,V格式的RDD上,对key进行升序或者降序排序。

Action行动算子

  • 概念:

Action类算子也是一类算子(函数)叫做行动算子,如foreach,collect,count等。Transformations类算子是延迟执行,Action类算子是触发执行。一个application应用程序中有几个Action类算子执行,就有几个job运行。

  • Action类算子
count
返回数据集中的元素数。会在结果计算完成后回收到Driver端。take(n)
返回一个包含数据集前n个元素的集合。first
first=take(1),返回数据集中的第一个元素。foreach
循环遍历数据集中的每个元素,运行相应的逻辑。collect
将计算结果回收到Driver端。

代码演示:见下篇博客:

https://blog.csdn.net/qq_41946557/article/details/102646935



控制算子

  • 概念:

控制算子有三种,cache,persist,checkpoint,以上算子都可以将RDD持久化,持久化的单位是partition。cache和persist都是懒执行的。必须有一个action类算子触发执行。checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。

  • cache

默认将RDD的数据持久化到内存中。cache是懒执行。

  • 注意:chche () = persist()=persist(StorageLevel.Memory_Only)
  • 测试cache文件:

测试代码:

package ddd.henu.persistentimport org.apache.spark.{SparkConf, SparkContext}object CacheTest {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local").setAppName("cache")val sc = new SparkContext(conf)sc.setLogLevel("error")var lines = sc.textFile("./data/persistData.txt")lines = lines.cache()val startTime1 = System.currentTimeMillis()val result1 = lines.count()     //当第一次运行时,从磁盘读取。val endTime1 = System.currentTimeMillis()println(s"条数: $result1,磁盘time:${endTime1-startTime1}")val startTime2 = System.currentTimeMillis()val result2 = lines.count()     //第二次,从缓存val endTime2 = System.currentTimeMillis()println(s"条数: $result2,内存time:${endTime2-startTime2}")/*结果:条数: 5138965,磁盘time:6085条数: 5138965,内存time:111*/sc.stop()}
}

【注】System.currentTimeMillis()小知识:存在性能问题

https://blog.csdn.net/qq_41946557/article/details/102647865


  • persist:

可以指定持久化的级别。最常用的是MEMORY_ONLY和MEMORY_AND_DISK。”_2”表示有副本数。

持久化级别如下:

【注】MEMORY_AND_DISK指先存内存,存不下后,存入磁盘,会序列化,虽说写的false.

代码部分演示:

  • cache和persist的注意事项:
  1. cache和persist都是懒执行,必须有一个action类算子触发执行。
  2. cache和persist算子的返回值可以赋值给一个变量,在其他job中直接使用这个变量就是使用持久化的数据了。持久化的单位是partition。
  3. cache和persist算子后不能立即紧跟action算子。
  4. cache和persist算子持久化的数据当applilcation执行完成之后会被清除。

错误:rdd.cache().count() 返回的不是持久化的RDD,而是一个数值了。


  • checkpoint

checkpoint将RDD持久化到磁盘,还可以切断RDD之间的依赖关系。checkpoint目录数据当application执行完之后不会被清除。

  • checkpoint 的执行原理:
  1. 当RDD的job执行完毕后,会从finalRDD从后往前回溯。
  2. 当回溯到某一个RDD调用了checkpoint方法,会对当前的RDD做一个标记。
  3. Spark框架会自动启动一个新的job,重新计算这个RDD的数据,将数据持久化到HDFS上。
  • 优化:对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job只需要将内存中的数据拷贝到HDFS上就可以,省去了重新计算这一步。
  • 使用:
 SparkConf conf = new SparkConf();conf.setMaster("local").setAppName("checkpoint");JavaSparkContext sc = new JavaSparkContext(conf);sc.setCheckpointDir("./checkpoint");JavaRDD<Integer> parallelize = sc.parallelize(Arrays.asList(1,2,3));parallelize.checkpoint();parallelize.count();sc.stop();

Spark _02SparkCore_RDD相关推荐

  1. hadoop,spark,scala,flink 大数据分布式系统汇总

    20220314 https://shimo.im/docs/YcPW8YY3T6dT86dV/read 尚硅谷大数据文档资料 iceberg相当于对hive的读写,starrocks相当于对mysq ...

  2. spark,hadoop区别

    https://zhuanlan.zhihu.com/p/95016937 Spark和Hadoop的区别和比较: 1.原理比较: Hadoop和Spark都是并行计算,两者都是用MR模型进行计算 H ...

  3. 大规模数据处理Apache Spark开发

    大规模数据处理Apache Spark开发 Spark是用于大规模数据处理的统一分析引擎.它提供了Scala.Java.Python和R的高级api,以及一个支持用于数据分析的通用计算图的优化引擎.它 ...

  4. 客快物流大数据项目(五十四):初始化Spark流式计算程序

    目录 初始化Spark流式计算程序 一.SparkSql参数调优设置 1.设置会话时区

  5. 客快物流大数据项目(四十六):Spark操作Kudu dataFrame操作kudu

    Spark操作Kudu dataFrame操作kudu 一.DataFrameApi读取kudu表中的数据 虽然我们可以通过上面显示的KuduContext执行大量操作,但我们还可以直接从默认数据源本 ...

  6. ❤️Spark的关键技术回顾,持续更新!【推荐收藏加关注】❤️

    目录 前言 Spark的关键技术回顾 一.Spark复习题回顾 1.Spark使用的版本 2.Spark几种部署方式? 3.Spark的提交任务的方式? 4.使用Spark-shell的方式也可以交互 ...

  7. 2021年大数据Spark(五十三):Structured Streaming Deduplication

    目录 Streaming Deduplication 介绍 需求 ​​​​​​​代码演示 Streaming Deduplication 介绍 在实时流式应用中,最典型的应用场景:网站UV统计. 1: ...

  8. 2021年大数据Spark(五十二):Structured Streaming 事件时间窗口分析

    目录 事件时间窗口分析 时间概念 ​​​​​​​event-time ​​​​​​​延迟数据处理 ​​​​​​​延迟数据 ​​​​​​​Watermarking 水位 ​​​​​​​官方案例演示 事件 ...

  9. 2021年大数据Spark(五十一):Structured Streaming 物联网设备数据分析

    目录 ​​​​​​​物联网设备数据分析 ​​​​​​​设备监控数据准备 ​​​​​​​创建Topic ​​​​​​​模拟数据 ​​​​​​​SQL风格 ​​​​​​​DSL风格 物联网设备数据分析 在 ...

最新文章

  1. 深度卷积生成对抗网络
  2. 七层负载均衡--Haproxy
  3. python可以干嘛知乎-一行Python代码能做什么?
  4. Spring配置数据源的四种方式
  5. LeetCode-77-Combinations
  6. POJ3686 The Windy's 【费用流】*
  7. PHP 显示本机的外网IP
  8. 了解你所不知道的SMON功能(五):Recover Dead transaction
  9. C语言运算符及其优先级汇总表口诀
  10. Linux shell 脚本实例
  11. 技术能力与真不是几年经验成正比的
  12. Android 项目规范
  13. Exploring Complementary Strengths of Invariant and Equivariant Representations for Few-Shot Learning
  14. 汇编语言--计算 ffff:0 ~ ffff:b 单元中的数据的和,存储在 dx 中
  15. python超链接程序,python超链接
  16. 《Graph Neural Network with Heterophily》阅读笔记
  17. QNX-Adaptive Partition
  18. IDA PRO 静态反汇编与OllyDbg动态调试实战技巧汇总
  19. 刷脸支付服务商市场空白大有可为
  20. 【叶子函数分享三十】SQL简繁转换函数

热门文章

  1. HDU - 6356 Glad You Came(线段树)
  2. CodeForces - 1334D Minimum Euler Cycle(构造+模拟)
  3. CodeForces - 456C Boredom(线性dp)
  4. HDU - 3374 String Problem(最小表示法+最大表示法+KMP的next数组)
  5. 洛谷 - P1308 统计单词数(字符串+模拟)
  6. CH - 6901 骑士放置(二分图最大独立集-二分图最大匹配+奇偶拆点)
  7. 最小覆盖圆的增量算法
  8. SQL 性能优化梳理
  9. 总结缓存使用过程中的几种策略以及优缺点组合分析
  10. 每日一道算法题-寻找丑数