Spark概述

核心模块


Spark编程配置

IDEA配置scala环境

IDEA软件中Scala配置安装教程(Spark计算环境搭建)_jing_zhong的博客-CSDN博客

较全的idea2020.3配置Scala_MI_farmer的博客-CSDN博客_idea配置scala sdk

记得编写Spark之前导入Spark所需库

WordCount案例

hello,(hello,hello,hello,hello)实现

def main(args: Array[String]): Unit = {// Application// Spark框架// TODO 建立连接val conf = new SparkConf().setMaster("local").setAppName("word count")val sc = new SparkContext(conf)// TODO 执行操作//read txtval lines:RDD[String] = sc.textFile("D:/txt/goodnight.txt")val words:RDD[String] = lines.flatMap(_.split(" "))val group:RDD[(String,Iterable[String])] = words.groupBy(word => word)val count = group.map{case (word, list)=>{(word, list.size)}}.collect().foreach(println)// TODO 关闭连接sc.stop()}

hello,(1,1,1,1)实现

def main(args: Array[String]): Unit = {// Application// Spark框架// TODO 建立连接val conf = new SparkConf().setMaster("local").setAppName("word count")val sc = new SparkContext(conf)// TODO 执行操作val lines:RDD[String] = sc.textFile("D:/txt/goodnight.txt")val words:RDD[String] = lines.flatMap(_.split(" "))words.map(word=>(word,1)).reduceByKey(_+_).collect().foreach(println(_))// TODO 关闭连接sc.stop()}

Spark-Standalone运行环境

Local配置步骤

将spark-3.0.0-bin-hadoop3.2.tar解压至自定文件夹

tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz

运行bin/spark-shell ( ctrl+C/:quit退出 )

[root@hadoop1 spark-3.0.0-bin-hadoop3.2]# bin/spark-shell
23/01/06 18:09:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://hadoop1:4040
Spark context available as 'sc' (master = local[*], app id = local-1672999762111).
Spark session available as 'spark'.
Welcome to____              __/ __/__  ___ _____/ /___\ \/ _ \/ _ `/ __/  '_//___/ .__/\_,_/_/ /_/\_\   version 3.0.0/_/Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_291)
Type in expressions to have them evaluated.
Type :help for more information.scala> val i = 10
i: Int = 10scala> sc.textFile("data/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey
reduceByKey   reduceByKeyLocallyscala> sc.textFile("data/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res0: Array[(String, Int)] = Array((Hello,4), (Java,1), (World,1), (Scala,2))

Spark context Web UI available at http://hadoop1:4040

可以使用hostname:4040查看Web UI

集群分工

hadoop1 hadoop2 hadoop3
Master worker worker

解压文件

将 spark-3.0.0-bin-hadoop3.2.tgz 文件上传到 Linux 并解压缩在指定位置

tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz

mv spark-3.0.0-bin-hadoop3.2 spark-standalone

修改配置

1) 进入解压缩后路径的 conf 目录,修改 slaves.template 文件名为 slaves

2) 用记事本修改 slaves 文件,添加 work 节点

添加所有的worker结点主机名

hadoop1

hadoop2

hadoop3

3) 修改 spark-env.sh.template 文件名为 spark-env.sh

4) 修改 spark-env.sh 文件,添加 JAVA_HOME 环境变量和指定集群对应的 master 节点

export JAVA_HOME=/opt/module/jdk1.8.0_144(你的jdk路径)

SPARK_MASTER_HOST=hadoop1

SPARK_MASTER_PORT=7077

注意:7077 端口为默认,相当于 hadoop3 内部通信的 8020 端口,此处的端口需要确认自己的 Hadoop 配置

5) 分发spark-standalone目录到其他worker主机

scp -r /export/server/spark-standalone root@hadoop2:/export/server/spark-standalone

scp -r /export/server/spark-standalone root@hadoop3:/export/server/spark-standalone

启动集群

sbin/start-all.sh

查看 Master 资源监控 Web UI 界面: http://hadoop1:8080

配置历史服务器

1) 修改conf/spark-defaults.conf.template 文件名为 spark-defaults.conf

2) 修改 spark-default.conf 文件,配置日志存储路径

spark.eventLog.enabled true

spark.eventLog.dir hdfs://hadoop1:8020/directory

hdfs中的目录需存在

3) 修改 spark-env.sh 文件, 添加日志配置

export SPARK_HISTORY_OPTS=" -Dspark.history.ui.port=18080 -Dspark.history.fs.logDirectory=hdfs://linux1:8020/directory -Dspark.history.retainedApplications=30"

4) 分发conf到其他worker对应目录

scp -r /export/server/spark-standalone root@hadoop2:/export/server/spark-standalone

scp -r /export/server/spark-standalone root@hadoop3:/export/server/spark-standalone

5) 先启动HDFS,后spark集群和历史服务器

HADOOP_HOME/sbin/start-all.sh

SPARK_HOME/sbin/start-all.sh

SPARK_HOME/sbin/start-history-server.sh

6) 重新执行任务

bin/spark-submit \

--class org.apache.spark.examples.SparkPi \

--master spark://linux1:7077 \

./examples/jars/spark-examples_2.12-3.0.0.jar \

10

7) 查看历史服务:http://linux1:18080

Spark-Yarn运行环境

独立部署(Standalone)模式由 Spark 自身提供计算资源,无需其他框架提供资源。这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但是也要记住,Spark 主要是计算框架,而不是资源调度框架,所以本身提供的资源调度并不是它的强项,所以还是和其他专业的资源调度框架集成会更靠谱一些,在国内工作中,Yarn 使用的非常多。

配置步骤

将spark-3.0.0-bin-hadoop3.2.tar解压至自定文件夹

tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz

mv spark-3.0.0-bin-hadoop3.2 spark-yarn

1) 修改 hadoop 配置文件HADOOP_HOME/etc/hadoop/yarn-site.xml, 并分发

    <!-- 是否将对容器实施物理内存限制 --><property><name>yarn.nodemanager.pmem-check-enabled</name><value>false</value></property><!-- 是否将对容器实施虚拟内存限制。 --><property><name>yarn.nodemanager.vmem-check-enabled</name><value>false</value></property>

2) 修改 SPARK_HOME/conf/spark-env.sh,添加 JAVA_HOME 和 YARN_CONF_DIR 配置

mv spark-env.sh.template spark-env.sh (改文件名)

添加如下内容:(每个人不同!!!)

export JAVA_HOME=/export/server/jdk1.8.0_291 YARN_CONF_DIR=/export/server/hadoop3.3.3/etc/hadoop

3) 启动HDFS和YARN集群

HADOOP_HOME/sbin/start-all.sh

4) 提交任务

bin/spark-submit \

--class org.apache.spark.examples.SparkPi \

--master yarn \

--deploy-mode cluster \

./examples/jars/spark-examples_2.12-3.0.0.jar \

10

配置历史服务器

1) 修改 spark-defaults.conf.template 文件名为 spark-defaults.conf

mv spark-defaults.conf.template spark-defaults.conf

2) 修改 spark-default.conf 文件,配置日志存储路径

spark.eventLog.enabled        true

spark.eventLog.dir                 hdfs://linux1:8020/directory

注意:需要启动 hadoop 集群,HDFS 上的目录需要提前存在。

3) 修改 spark-env.sh 文件, 添加日志配置

export SPARK_HISTORY_OPTS="

-Dspark.history.ui.port=18080

-Dspark.history.fs.logDirectory=hdfs://linux1:8020/directory

-Dspark.history.retainedApplications=30"

⚫ 参数 1 含义:WEB UI 访问的端口号为 18080

⚫ 参数 2 含义:指定历史服务器日志存储路径

⚫ 参数 3 含义:指定保存 Application 历史记录的个数,如果超过这个值,旧的应用程序 信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。

4) 修改 spark-defaults.conf

spark.yarn.historyServer.address=linux1:18080

spark.history.ui.port=18080

5) 启动历史服务

sbin/start-history-server.sh

6) 重新提交应用

bin/spark-submit \

--class org.apache.spark.examples.SparkPi \

--master yarn \

--deploy-mode client \

./examples/jars/spark-examples_2.12-3.0.0.jar \

10

7) 查看Web UI  hostname:8088

可以跳转到hostname:18080

Windows运行环境

配置步骤

1) 将文件 spark-3.0.0-bin-hadoop3.2.tgz 解压缩到无中文无空格的路径中

2) 执行解压缩文件路径下 bin 目录中的 spark-shell.cmd 文件,启动 Spark 本地环境,可以配置环境变量后直接在cmd内执行

环境变量:

SPARK_HOME=D:\myjava\spark\spark-3.0.0-bin-hadoop3.2\bin

常用端口号

➢Spark 查看当前 Spark-shell 运行任务情况端口号:4040(计算)

➢ Spark Master 内部通信服务端口号:7077

➢ Standalone 模式下,Spark Master Web 端口号:8080(资源)

➢ Spark 历史服务器端口号:18080

➢ Hadoop YARN 任务运行情况查看端口号:8088

Spark架构

Spark 框架的核心是一个计算引擎,整体来说,它采用了标准 master-slave 的结构。

如下图所示,它展示了一个 Spark 执行时的基本结构。图形中的 Driver 表示 master, 负责管理整个集群中的作业任务调度。图形中的 Executor 则是 slave,负责实际执行任务。

核心组件

Driver

Spark 驱动器节点,用于执行 Spark 任务中的 main 方法,负责实际代码的执行工作。

Driver 在 Spark 作业执行时主要负责:

➢ 将用户程序转化为作业(job)

➢ 在 Executor 之间调度任务(task)

➢ 跟踪 Executor 的执行情况

➢ 通过 UI 展示查询运行情况

实际上,我们无法准确地描述 Driver 的定义,因为在整个的编程过程中没有看到任何有关 Driver 的字眼。所以简单理解,所谓的 Driver 就是驱使整个应用运行起来的程序,也称之为 Driver 类。

Executor

Spark Executor 是集群中工作节点(Worker)中的一个 JVM 进程,负责在 Spark 作业 中运行具体任务(Task),任务彼此之间相互独立。

Spark 应用启动时,Executor 节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。

如果有 Executor 节点发生了 故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他 Executor 节点 上继续运行。

Executor 有两个核心功能:

➢ 负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程

➢ 它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存 式存储。RDD 是直接缓存在 Executor 进程内的,因此任务可以在运行时充分利用缓存         数据加速运算。

Master & Worker

Spark 集群的独立部署环境中,不需要依赖其他的资源调度框架,自身就实现了资源调 度的功能,所以环境中还有其他两个核心组件:Master 和 Worker,这里的 Master 是一个进 程,主要负责资源的调度和分配,并进行集群的监控等职责,类似于 Yarn 环境中的 RM, 而 Worker 呢,也是进程,一个 Worker 运行在集群中的一台服务器上,由 Master 分配资源对 数据进行并行的处理和计算,类似于 Yarn 环境中 NM。

ApplicationMaster Hadoop

用户向 YARN 集群提交应用程序时,提交程序中应该包含 ApplicationMaster,用 于向资源调度器申请执行任务的资源容器 Container,运行用户自己的程序任务 job,监控整 个任务的执行,跟踪整个任务的状态,处理任务失败等异常情况。 说的简单点就是,ResourceManager(资源)和 Driver(计算)之间的解耦合靠的就是 ApplicationMaster。

核心概念

Executor 与 Core

Spark Executor 是集群中运行在工作节点(Worker)中的一个 JVM 进程,是整个集群中 的专门用于计算的节点。在提交应用中,可以提供参数指定计算节点的个数,以及对应的资 源。这里的资源一般指的是工作节点 Executor 的内存大小和使用的虚拟 CPU 核(Core)数量。

并行度(Parallelism)

在分布式计算框架中一般都是多个任务同时执行,由于任务分布在不同的计算节点进行 计算,所以能够真正地实现多任务并行执行,记住,这里是并行,而不是并发。这里我们将整个集群并行执行任务的数量称之为并行度。那么一个作业到底并行度是多少呢?这个取决 于框架的默认配置。应用程序也可以在运行过程中动态修改。

并发与并行的区别_一缕阳光a的博客-CSDN博客_并发和并行的区别

提交流程

所谓的提交流程,其实就是我们开发人员根据需求写的应用程序通过 Spark 客户端提交 给 Spark 运行环境执行计算的流程。在不同的部署环境中,这个提交过程基本相同,但是又有细微的区别,国内工作中,将 Spark 引用部署到 Yarn 环境中会更多一些,所以本提交流程是基于 Yarn 环境的。

Spark 应用程序提交到 Yarn 环境中执行的时候,一般会有两种部署执行的方式:Client 和 Cluster。两种模式主要区别在于:Driver 程序的运行节点位置。

Yarn Client 模式

Client 模式将用于监控和调度的 Driver 模块在客户端执行,而不是在 Yarn 中,所以一 般用于测试。

➢ Driver 在任务提交的本地机器上运行

➢ Driver 启动后会和 ResourceManager 通讯申请启动 ApplicationMaster

➢ ResourceManager 分配 container,在合适的 NodeManager 上启动 ApplicationMaster,负责向 ResourceManager 申请 Executor 内存

➢ ResourceManager 接到 ApplicationMaster 的资源申请后会分配 container,然后 ApplicationMaster 在资源分配指定的 NodeManager 上启动 Executor 进程

➢ Executor 进程启动后会向 Driver 反向注册,Executor 全部注册完成后 Driver 开始执行 main 函数

➢ 之后执行到 Action 算子时,触发一个 Job,并根据宽依赖开始划分 stage,每个 stage 生 成对应的 TaskSet,之后将 task 分发到各个 Executor 上执行。

Yarn Cluster 模式

Cluster 模式将用于监控和调度的 Driver 模块启动在 Yarn 集群资源中执行。一般应用于 实际生产环境。

➢ 在 YARN Cluster 模式下,任务提交后会和 ResourceManager 通讯申请启动 ApplicationMaster,

➢ 随后 ResourceManager 分配 container,在合适的 NodeManager 上启动 ApplicationMaster, 此时的 ApplicationMaster 就是 Driver。

➢ Driver 启动后向 ResourceManager 申请 Executor 内存,ResourceManager 接到 ApplicationMaster 的资源申请后分配 container,然后在合适 NodeManager 上启动 Executor 进程

➢ Executor 进程启动后会向 Driver 反向注册,Executor 全部注册完成后 Driver 开始执行 main 函数,

➢ 之后执行到 Action 算子时,触发一个 Job,并根据宽依赖开始划分 stage,每个 stage 生 成对应的 TaskSet,之后将 task 分发到各个 Executor 上执行。

分布式计算模拟代码

基础

import java.net.Socketobject Driver {def main(args: Array[String]): Unit = {val clint = new Socket("localhost", 9999)println("clint连接9999")val out = clint.getOutputStreamout.write(2)out.flush()out.close()clint.close()}
}// =============================================import java.net.ServerSocketobject Executor {def main(args: Array[String]): Unit = {val server = new ServerSocket(9999)println("Server启动,等待客户端数据")val clint = server.accept()val stream = clint.getInputStreamval data = stream.read()println("Clint数据: "+ data)stream.close()clint.close()server.close()}
}

发送计算任务

import java.io.ObjectInputStream
import java.net.ServerSocket
object Executor {def main(args: Array[String]): Unit = {val server = new ServerSocket(9999)println("Server启动,等待客户端数据")val clint = server.accept()val stream = clint.getInputStreamval objstream = new ObjectInputStream(stream)val task:Task = objstream.readObject().asInstanceOf[Task]val data:List[Int] = task.compute()println("Clint数据: "+ data)stream.close()clint.close()server.close()}
}import java.io.ObjectOutputStream
import java.net.Socket
object Driver {def main(args: Array[String]): Unit = {val clint = new Socket("localhost", 9999)println("clint连接9999")val out = clint.getOutputStreamval objout = new ObjectOutputStream(out)val task = new Task()objout.writeObject(task)objout.flush()objout.close()clint.close()println("客户端数据发送完")}
}class Task extends Serializable {val datas: List[Int] = List(1,2,3,4,5)val logic: Int => Int = (x:Int)=>{x*2}def compute()={datas.map(logic)}
}

分布式任务模拟

spark分布式计算模拟代码-spark文档类资源-CSDN下载

Spark Core核心编程

Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于 处理不同的应用场景。

三大数据结构分别是:

➢ RDD : 弹性分布式数据集

➢ 累加器:分布式共享只写变量

➢ 广播变量:分布式共享只读变量

RDD概念

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据 处理模型。

代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行 计算的集合。

➢ 弹性

⚫ 存储的弹性:内存与磁盘的自动切换;

⚫ 容错的弹性:数据丢失可以自动恢复;

⚫ 计算的弹性:计算出错重试机制;

⚫ 分片的弹性:可根据需要重新分片。

➢ 分布式:数据存储在大数据集群不同节点上

➢ 数据集:RDD 封装了计算逻辑,并不保存数据

➢ 数据抽象:RDD 是一个抽象类,需要子类具体实现

➢ 不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在 新的 RDD 里面封装计算逻辑

➢ 可分区、并行计算

方法

大数据Spark学习笔记—未更完相关推荐

  1. 大数据业务学习笔记_学习业务成为一名出色的数据科学家

    大数据业务学习笔记 意见 (Opinion) A lot of aspiring Data Scientists think what they need to become a Data Scien ...

  2. 大数据HiveSQL学习笔记三-查询基础语法以及常用函数

    大数据HiveSQL学习笔记三-查询基础语法以及常用函数 一.基础语法 1.SELECT -列名- FROM -表名- WHERE -筛选条件- 如:需要根据城市,性别找出匹配的10个用户 user_ ...

  3. 2018大数据培训学习路线图(详细完整版)

    2018大数据培训学习路线全课程目录+学习线路详解(详细完整版) 第一阶段:大数据基础Java语言基础阶段 1.1:Java开发介绍 1.1.1 Java的发展历史 1.1.2 Java的应用领域 1 ...

  4. 大数据 -- kafka学习笔记:知识点整理(部分转载)

    一 为什么需要消息系统 1.解耦 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束. 2.冗余 消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险.许多 ...

  5. 读《从0开始学大数据》-- 学习笔记和感想随笔(一)

    主要记录阅读<从0开始学大数据>课程的学习笔记.课程系统性的介绍大数据的发展史.大数据系统的原理及架构.大数据生态体系中的主要产品.如何进行呢大数据开发实践.大数据平台开发及系统集成.使用 ...

  6. 云计算与大数据课程学习笔记

    一. 1.什么是云计算? 云计算(cloud computing)是分布式计算的一种,指的是通过网络"云"将巨大的数据计算处理程序分解成无数个小程序,然后,通过多部服务器组成的系统 ...

  7. [大数据]-- 大数据spark学习课程安排

    文章来源:http://www.dataguru.cn/forum.php?mod=viewthread&tid=316241 为了广大学员更好的学习spark,对<Spark大数据快速 ...

  8. 大数据生态圈学习笔记

    第一章 生态圈概述 1.Hadoop版本区别: Hadoop1.0:HDFS+MapReduce Hadoop2.0:HDFS+YARN+(MapReduce+Others) 2.分布式存储系统HDF ...

  9. 大数据计算系统学习笔记

    1.大数据计算体系可归纳三个基本层次: 数据应用系统,数据处理系统,数据存储系统 2.计算的总体架构 HDFS(Hadoop 分布式文件系统) (1)设计思想:分而治之,将大文件大批量文件,分布式存放 ...

最新文章

  1. 又一联盟成立:清华、北大、深大、南科大、哈工大等12家在深单位加盟
  2. 会议冲突!临时更新客户端!这些在线视频会议痛点统统解决掉!
  3. android 地图相册,时光地图相册
  4. python编程题3
  5. MySQL主从同步校验与重新同步
  6. 在SAP分析云里根据业务数据绘制词云(Word Cloud)
  7. 《图解 HTTP》读书笔记(未完待续)
  8. electron安装比较慢的方法
  9. Java原生API操作XML
  10. SQL开发技巧(二) 【转】感觉他写的很好
  11. 中国研究生数学建模竞赛试题
  12. 名字打架小游戏 java_闲暇极品MD5 能用名字打架的小游戏
  13. 传染病模型-java代码
  14. oracle px execute reply,关于昨天的PX Deq: Execute Reply重新开贴请教
  15. STM32开发基础知识——OLED开发基础
  16. Linux版本的mcnp6,Initial MCNP6 release overview. MCNP6 version 0.1
  17. 小米妙享偷渡用户升级方法(3.0.2.68)
  18. 利用花生壳来调试4G网络模块
  19. Angular4介绍
  20. 团队编程——web应用之人事管理系统

热门文章

  1. SaveFileDialog无法打开
  2. python打折简单程序每满_丰满Python程序设计基础【实境编程】_高校邦_答案
  3. Ubuntu-C语言下的应用
  4. 如何做SEO如何优化排名教程
  5. ​ ​c盘满了怎么扩展分区?​c盘太小如何扩展分区 ​
  6. 商学院全球化管理论坛 思科总裁林正刚作主题发言
  7. wps office 使用积累 WPS 2016表格怎么自动适应行高列宽
  8. word的文档修订功能
  9. Android 全景视频播放器(VR视频播放器探索)
  10. 【验证码刷新不出来及其解决】