一 安装指引

(91条消息) [Hadoop] mac搭建hadoop3.X 伪分布模式_小墨鱼的专栏-CSDN博客https://zengwenqi.blog.csdn.net/article/details/122427729

二 Spark基础概念

Spark的核心概念

Spark 是 UC Berkeley AMP lab 开发的一个集群计算的框架,类似于 Hadoop,但有很多的区别。最大的优化是让计算任务的中间结果可以存储在内存中,不需要每次都写入 HDFS,更适用于需要迭代的 MapReduce 算法场景中,可以获得更好的性能提升。例如一次排序测试中,对 100TB 数据进行排序,Spark 比 Hadoop 快三倍,并且只需要十分之一的机器。Spark 集群目前最大的可以达到 8000 节点,处理的数据达到 PB 级别,在互联网企业中应用非常广泛。

Spark 的特性

Hadoop 的核心是分布式文件系统 HDFS 和计算框架 MapReduces。Spark 可以替代 MapReduce,并且兼容 HDFS、Hive 等分布式存储层,良好的融入 Hadoop 的生态系统。

Spark 执行的特点

中间结果输出:Spark 将执行工作流抽象为通用的有向无环图执行计划(DAG),可以将多 Stage 的任务串联或者并行执行。

数据格式和内存布局:Spark 抽象出分布式内存存储结构弹性分布式数据集 RDD,能够控制数据在不同节点的分区,用户可以自定义分区策略。

任务调度的开销:Spark 采用了事件驱动的类库 AKKA 来启动任务,通过线程池的复用线程来避免系统启动和切换开销。

Spark 的优势

  • 速度快,运行工作负载快 100 倍。Apache Spark 使用最先进的 DAG 调度器、查询优化器和物理执行引擎,实现了批处理和流数据的高性能。
  • 易于使用,支持用 Java、Scala、Python、R 和 SQL 快速编写应用程序。Spark 提供了超过 80 个算子,可以轻松构建并行应用程序。您可以从 Scala、Python、R 和 SQL shell 中交互式地使用它。
  • 普遍性,结合 SQL、流处理和复杂分析。Spark 提供了大量的库,包括 SQL 和 DataFrames、用于机器学习的 MLlib、GraphX 和 Spark 流。您可以在同一个应用程序中无缝地组合这些库。
  • 各种环境都可以运行,Spark 在 Hadoop、Apache Mesos、Kubernetes、单机或云主机中运行。它可以访问不同的数据源。您可以使用它的独立集群模式在 EC2、Hadoop YARN、Mesos 或 Kubernetes 上运行 Spark。访问 HDFS、Apache Cassandra、Apache HBase、Apache Hive 和数百个其他数据源中的数据。

Spark 生态系统 —— BDAS

目前,Spark 已经发展成为包含众多子项目的大数据计算平台。BDAS 是伯克利大学提出的基于 Spark 的数据分析栈(BDAS)。其核心框架是 Spark,同时涵盖支持结构化数据 SQL 查询与分析的查询引擎 Spark SQL,提供机器学习功能的系统 MLBase 及底层的分布式机器学习库 MLlib,并行图计算框架 GraphX,流计算框架 Spark Streaming,近似查询引擎 BlinkDB,内存分布式文件系统 Tachyon,资源管理框架 Mesos 等子项目。这些子项目在 Spark 上层提供了更高层、更丰富的计算范式。

1 . Spark Core:Spark的核心组件,其操作的数据对象是RDD(弹性分布式数据集)
图中在Spark Core上面的四个组件都依赖于Spark Core,可以简单认为Spark Core就是Spark生态系统中的离线计算框架,eg:Spark Core中提供的map,reduce算子可以完成mapreduce计算引擎所做的计算任务

2 . Spark Streaming:Spark生态系统中的流式计算框架,其操作的数据对象是DStream,其实Spark Streaming是将流式计算分解成一系列短小的批处理作业。这里的批处理引擎是Spark Core,也就是把Spark Streaming的输入数据按照batch size(批次间隔时长)(如1秒)分成一段一段的数据系列(DStream),每一段数据都转换成Spark Core中的RDD,然后将Spark Streaming中对DStream的转换计算操作变为针对Spark中对RDD的转换计算操作,如下官方提供的图

在内部实现上,DStream由一组时间序列上连续的RDD来表示。每个RDD都包含了自己特定时间间隔内的数据流(如上图中0到1秒接收到的数据成为一个RDD,1到2秒接收到的数据成为一个RDD),使用Spark Streaming对图中DStream的操作就会转化成使用Spark Core中的对应算子(函数)对Rdd的操作

3 . Spark SQL:可以简单认为可以让用户使用写SQL的方式进行数据计算,SQL会被SQL解释器转化成Spark core任务,让懂SQL不懂spark的人都能通过写SQL的方式进行数据计算,类似于hive在Hadoop生态圈中的作用,提供SparkSql CLI(命令行界面),可以再命令行界面编写SQL

4 . Spark Graphx:Spark生态系统中的图计算和并行图计算,目前较新版本已支持PageRank、数三角形、最大连通图和最短路径等6种经典的图算法

5 . Spark Mlib:一个可扩展的Spark机器学习库,里面封装了很多通用的算法,包括二元分类、线性回归、聚类、协同过滤等。用于机器学习和统计等场景

Q1: 什么是RDD

RDD的全称是 Resilient Distributed Datasets,这是Spark的一种数据抽象集合,它可以被执行在分布式的集群上进行各种操作,而且有较强的容错机制。RDD可以被分为若干个分区,每一个分区就是一个数据集片段,从而可以支持分布式计算。它们是在多个节点上运行和操作以在集群上进行并行处理的元素。RDD是不可变元素,这意味着一旦创建了RDD,就无法对其进行更改。RDD也具有容错能力,因此在发生任何故障时,它们会自动恢复。您可以对这些RDD应用多个操作来完成某项任务。

Q2: RDD运行时相关的关键名词

简单来说可以有 Client、Job、Master、Worker、Driver、Stage、Task以及Executor,这几个东西在调优的时候也会经常遇到的。

Client:指的是客户端进程,主要负责提交job到Master;

Job:Job来自于我们编写的程序,Application包含一个或者多个job,job包含各种RDD操作;

Master:指的是Standalone模式中的主控节点,负责接收来自Client的job,并管理着worker,可以给worker分配任务和资源(主要是driver和executor资源);

Worker:指的是Standalone模式中的slave节点,负责管理本节点的资源,同时受Master管理,需要定期给Master回报heartbeat(心跳),启动Driver和Executor;

Driver:指的是 job(作业)的主进程,一般每个Spark作业都会有一个Driver进程,负责整个作业的运行,包括了job的解析、Stage的生成、调度Task到Executor上去执行;

Stage:中文名 阶段,是job的基本调度单位,因为每个job会分成若干组Task,每组任务就被称为 Stage;

Task:任务,指的是直接运行在executor上的东西,是executor上的一个线程;

Executor:指的是 执行器,顾名思义就是真正执行任务的地方了,一个集群可以被配置若干个Executor,每个Executor接收来自Driver的Task,并执行它(可同时执行多个Task)。

Q3: 什么是DAG

全称是 Directed Acyclic Graph,中文名是有向无环图。Spark就是借用了DAG对RDD之间的关系进行了建模,用来描述RDD之间的因果依赖关系。因为在一个Spark作业调度中,多个作业任务之间也是相互依赖的,有些任务需要在一些任务执行完成了才可以执行的。在Spark调度中就是有DAG scheduler,它负责将job分成若干组Task组成的Stage。

Q4: Spark的部署模式有哪些

主要有local模式、Standalone模式、Mesos模式、YARN模式。

  • Standalone: 独立模式,Spark 原生的简单集群管理器, 自带完整的服务, 可单独部署到一个集群中,无需依赖任何其他资源管理系统, 使用 Standalone 可以很方便地搭建一个集群,一般在公司内部没有搭建其他资源管理框架的时候才会使用。
  • Mesos:一个强大的分布式资源管理框架,它允许多种不同的框架部署在其上,包括 yarn.
  • YARN: 统一的资源管理机制, 在上面可以运行多套计算框架, 如map reduce、storm 等, 根据 driver 在集群中的位置不同,分为 yarn client 和 yarn cluster。

实际上Spark内部为了方便用户测试,自身也提供了一些部署模式。由于在实际工厂环境下使用的绝大多数的集群管理器是 Hadoop YARN,因此我们关注的重点是 Hadoop YARN 模式下的 Spark 集群部署。

用户在提交任务给 Spark 处理时,以下两个参数共同决定了 Spark 的运行方式。
·– master MASTER_URL :决定了 Spark 任务提交给哪种集群处理。
·– deploy-mode DEPLOY_MODE:决定了 Driver 的运行方式,可选值为 Client或者 Cluster。

Q5: Shuffle操作是什么

Shuffle指的是数据从Map端到Reduce端的数据传输过程,Shuffle性能的高低直接会影响程序的性能。因为Reduce task需要跨节点去拉在分布在不同节点上的Map task计算结果,这一个过程是需要有磁盘IO消耗以及数据网络传输的消耗的,所以需要根据实际数据情况进行适当调整。另外,Shuffle可以分为两部分,分别是Map阶段的数据准备与Reduce阶段的数据拷贝处理,在Map端我们叫Shuffle Write,在Reduce端我们叫Shuffle Read。

Q6: 什么是惰性执行

这是RDD的一个特性,在RDD中的算子可以分为Transformation算子和Action算子,其中Transformation算子的操作都不会真正执行,只会记录一下依赖关系,直到遇见了Action算子,在这之前的所有Transform操作才会被触发计算,这就是所谓的惰性执行。具体哪些是Transformation和Action算子,可以看下一节。

三 Spark与Pyspark架构

Spark集群由Driver, Cluster Manager(Standalone,Yarn 或 Mesos),以及Worker Node组成。对于每个Spark应用程序,Worker Node上存在一个Executor进程,Executor进程中包括多个Task线程。

对于pyspark,为了不破坏Spark已有的运行时架构,Spark在外围包装一层Python API。在Driver端,借助Py4j实现Python和Java的交互,进而实现通过Python编写Spark应用程序。在Executor端,则不需要借助Py4j,因为Executor端运行的Task逻辑是由Driver发过来的,那是序列化后的字节码。

其中白色部分是新增的Python进程,在Driver端,通过Py4j实现在Python中调用Java的方法,即将用户写的PySpark程序”映射”到JVM中.

例如,用户在PySpark中实例化一个Python的SparkContext对象,最终会在JVM中实例化Scala的SparkContext对象;在Executor端,则不需要借助Py4j,因为Executor端运行的Task逻辑是由Driver发过来的,那是序列化后的字节码,虽然里面可能包含有用户定义的Python函数或Lambda表达式,Py4j并不能实现在Java里调用Python的方法,为了能在Executor端运行用户定义的Python函数或Lambda表达式,则需要为每个Task单独启一个Python进程,通过socket通信方式将Python函数或Lambda表达式发给Python进程执行。

语言层面的交互总体流程如下图所示,实线表示方法调用,虚线表示结果返回。

四 PySpark简介

Apache Spark是用Scala编程语言编写的。为了用Spark支持Python,Apache Spark社区发布了一个工具PySpark。使用PySpark,您也可以使用Python编程语言处理RDD。正是由于一个名为Py4j的库,他们才能实现这一目标。

运行pyspark的方式

pyspark主要通过以下一些方式运行。
1 通过pyspark进入pyspark单机交互式环境。
这种方式一般用来测试代码。
也可以指定jupyter或者ipython为交互环境。

2 通过spark-submit提交Spark任务到集群运行。
这种方式可以提交Python脚本或者Jar包到集群上让成百上千个机器运行任务。
这也是工业界生产中通常使用spark的方式。

3 Python安装findspark和pyspark库。
可以在jupyter和其它Python环境中像调用普通库一样地调用pyspark库。
这也是本书配置pyspark练习环境的方式。

通过spark-submit提交任务到集群运行常见问题

以下为在集群上运行pyspark时相关的一些问题,
1,pyspark是否能够调用Scala或者Java开发的jar包?
答:只有Driver中能够调用jar包,通过Py4J进行调用,在executors中无法调用。
2,pyspark如何在excutors中安装诸如pandas,numpy等包?
答:可以通过conda建立Python环境,然后将其压缩成zip文件上传到hdfs中,并在提交任务时指定环境。
当然,最简单直接的方案是把你想要的anaconda环境打包成zip上传到集群hdfs环境中。注意,你打包的机器应当和集群的机器具有相同的linux操作系统。
3,pyspark如何添加自己编写的其它Python脚本到excutors中的PYTHONPATH中?
答:可以用py-files参数设置,可以添加.py,.egg 或者压缩成.zip的Python脚本,在excutors中可以import它们。
4,pyspark如何添加一些配置文件到各个excutors中的工作路径中?
答:可以用files参数设置,不同文件名之间以逗号分隔,在excutors中用SparkFiles.get(fileName)获取。

#提交python写的任务
spark-submit --master yarn \
--deploy-mode cluster \
--executor-memory 12G \
--driver-memory 12G \
--num-executors 100 \
--executor-cores 2 \
--conf spark.yarn.maxAppAttempts=2 \
--conf spark.default.parallelism=1600 \
--conf spark.sql.shuffle.partitions=1600 \
--conf spark.memory.offHeap.enabled=true \
--conf spark.memory.offHeap.size=2g\
--conf spark.task.maxFailures=10 \
--conf spark.stage.maxConsecutiveAttempts=10 \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./anaconda3.zip/anaconda3/bin/python #指定excutors的Python环境
--conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON = ./anaconda3.zip/anaconda3/bin/python  #cluster模式时候设置
--archives viewfs:///user/hadoop-xxx/yyy/anaconda3.zip #上传到hdfs的Python环境
--files  data.csv,profile.txt
--py-files  pkg.py,tqdm.py
pyspark_demo.py

pyspark spark-submit 集群提交任务以及引入虚拟环境依赖包攻略:https://www.cnblogs.com/piperck/p/10121097.html

PySpark - SparkContext

SparkContext是任何spark功能的入口点。当我们运行任何Spark应用程序时,会启动一个驱动程序,它具有main函数,并且此处启动了SparkContext。然后,驱动程序在工作节点上的执行程序内运行操作。

SparkContext使用Py4J启动JVM并创建JavaSparkContext。默认情况下,PySpark将SparkContext作为'sc'提供,因此创建新的SparkContext将不起作用。

以下代码块包含PySpark类的详细信息以及SparkContext可以采用的参数。

class pyspark.SparkContext (master = None,appName = None, sparkHome = None, pyFiles = None, environment = None, batchSize = 0, serializer = PickleSerializer(), conf = None, gateway = None, jsc = None, profiler_cls = <class 'pyspark.profiler.BasicProfiler'>
)

以下是SparkContext的参数具体含义:

  • Master- 它是连接到的集群的URL。
  • appName- 您的工作名称。
  • sparkHome - Spark安装目录。
  • pyFiles - 要发送到集群并添加到PYTHONPATH的.zip或.py文件。
  • environment - 工作节点环境变量。
  • batchSize - 表示为单个Java对象的Python对象的数量。设置1以禁用批处理,设置0以根据对象大小自动选择批处理大小,或设置为-1以使用无限批处理大小。
  • serializer- RDD序列化器。
  • Conf - L {SparkConf}的一个对象,用于设置所有Spark属性。
  • gateway - 使用现有网关和JVM,否则初始化新JVM。
  • JSC - JavaSparkContext实例。
  • profiler_cls - 用于进行性能分析的一类自定义Profiler(默认为pyspark.profiler.BasicProfiler)

在上述参数中,主要使用master和appname。任何PySpark程序的会使用以下两行:

from pyspark import SparkContext
sc = SparkContext("local", "Hello Pyspark")

3.1 SparkContext示例 - PySpark Shell

现在你对SparkContext有了足够的了解,让我们在PySpark shell上运行一个简单的例子。在这个例子中,我们将计算README.md文件中带有字符“a”或“b”的行数。那么,让我们说如果一个文件中有5行,3行有字符'a',那么输出将是→ Line with a:3。字符'b'也是如此。

我们不会在以下示例中创建任何SparkContext对象,因为默认情况下,当PySpark shell启动时,Spark会自动创建名为sc的SparkContext对象。如果您尝试创建另一个SparkContext对象,您将收到以下错误 - “ValueError:无法一次运行多个SparkContexts”。

在终端输入pyspark 启动PySpark Shell:

>>> logFile="file:user/local/hadoop-2.8.5/README.txt"
>>> logData=sc.textFile(logFile).cache()
>>> numAs=logData.filter(lambda s:'a' in s).count()
>>> numBs=logData.filter(lambda s:'b' in s).count()
>>> print("Line with a:%i,line with b:%i" % (numAs,numBs))
Line with a:20, line with b:10

3.2 SparkContext示例 - Python程序

让我们使用Python程序运行相同的示例。创建一个名为demo.py的Python文件,并在该文件中输入以下代码。

from pyspark import SparkContext
logFile = "file:usr/local/hadoop-2.8.5/README.txt"
sc = SparkContext("local", "Hello PySpark")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print("Line with a:%i,lines with b :%i" % (numAs, numBs))

然后我们将在终端中执行以下命令来运行此Python文件。我们将得到与上面相同的输出。

spark-submit demo.py 

3.3 Spark -- WordCount词频统计

# -*- coding: utf-8 -*-
import sys
import os
import datetime
from pyspark import SparkConf,SparkContextsc = SparkConf().setAppName("wordcount")
spark = SparkContext(conf=sc)text_file = spark.textFile("hdfs://examples/pyspark/words.txt")
word_cnt_rdd = text_file.flatMap(lambda line : line.split(' ')).map(lambda word : (word, 1)).reduceByKey(lambda x, y: x + y)
word_cnt_rdd.saveAsTextFile('hdfs://user/wordcount_result') 
#spark-cluster-mode
./spark-submit \
--verbose \
--master yarn \
--deploy-mode cluster \
--num-executors 10 \
--executor-cores 1 \
--executor-memory 8G \
--driver-memory 4G \
--conf spark.pyspark.python=python3 \
wordcount.py#spark-client-mode
./spark-submit \
--verbose \
--master yarn \
--deploy-mode client \
--num-executors 10 \
--executor-cores 1 \
--executor-memory 8G \
--driver-memory 4G \
--conf spark.pyspark.python=python3 \
--conf spark.pyspark.driver.python=python3 \
wordcount.py

学习资源推荐:

1)edureka about PySpark Tutorial

印度老哥的课程,B站可直接看,不过口音略难听懂不过还好有字幕。

https://www.bilibili.com/video/BV1i4411i79a?p=1

2)10天吃掉那只pyspark - Heywhale.com

https://github.com/lyhue1991/eat_pyspark_in_10_days

3)官方文档

PySpark Documentation — PySpark 3.2.0 documentation

PySpark Tutorial

全面解析Spark,以及和Python的对接

Spark的三种集群部署模式

大数据入门与实战-PySpark的使用教程

Mac下安装spark,并配置pycharm-pyspark完整教程

https://blog.csdn.net/shiyutianming/article/details/99946797

[Spark]PySpark入门学习教程---介绍(1)相关推荐

  1. [Spark]PySpark入门学习教程---RDD介绍(2)

    一 RDD pyspark.RDD        SparkRDD RDD指的是弹性分布式数据集(Resilient Distributed Dataset),它是spark计算的核心.尽管现在都使用 ...

  2. [Spark]PySpark入门学习教程---例子RDD与DataFrame

    一 例子说明 用spark的RDD与DataFrame两种方式实现如下功能 1.合并主特征与单特征 2.对标签进行过滤 3.标签与特征进行合并 4.输出指定格式最后的数据 二 数据说明 包括三个文件: ...

  3. EDEM入门学习教程—界面介绍

    大家好,这个假期比较无聊,以EDEM2022为例,码一篇EDEM入门学习的教程.先介绍EDEM界面. 打开EDEM,上面这一块是标题栏,右面是图形界面,左边是模型树. 先看标题栏,主要介绍中间这一条, ...

  4. MAYA 2022基础入门学习教程

    流派:电子学习| MP4 |视频:h264,1280×720 |音频:AAC,48.0 KHz 语言:英语+中英文字幕(根据原英文字幕机译更准确)|大小解压后:3.41 GB |时长:4.5小时 包含 ...

  5. 三维地形制作软件 World Machine 基础入门学习教程

    <World Machine课程>涵盖了你需要的一切,让你有一个坚实的基础来构建自己的高质量的电影或视频游戏地形. 你会学到什么 为渲染或游戏开发创建高分辨率.高细节的地形. 基于Worl ...

  6. 3dmax2021入门学习教程

    3dmax2021入门学习教程 MP4 | h264,1280x720 |语言:英语+中文字幕(根据原英文字幕机译)+原英文字幕 | 4h 23m | 3.55 GB 含课程素材工程文件 云桥网络 平 ...

  7. linux vim配置c,Linux入门学习教程:GNU C及将Vim打造成C/C++的半自动化IDE

    C语言在Linux系统中的重要性自然是无与伦比.不可替代,所以我写Linux江湖系列不可能不提C语言.C语言是我的启蒙语言,感谢C语言带领我进入了程序世界.虽然现在不靠它吃饭,但是仍免不了经常和它打交 ...

  8. 微风:AI新手入门学习教程

    大家好我是微风,一个爱设计爱生活的平面设计师,最近总有一些朋友问我,零基础学习AI软件好学吗,AI新手学习软件好操作嘛,那么今天的这篇文章主要给大家介绍下新手AI新手入门学习教程以及学习平面设计分为哪 ...

  9. 【OpenCV图像处理入门学习教程六】基于Python的网络爬虫与OpenCV扩展库中的人脸识别算法比较

    OpenCV图像处理入门学习教程系列,上一篇第五篇:基于背景差分法的视频目标运动侦测 一.网络爬虫简介(Python3) 网络爬虫,大家应该不陌生了.接下来援引一些Jack-Cui在专栏<Pyt ...

最新文章

  1. Linux dig
  2. 关于重复接收NSNotificationCenter发送的通知的问题
  3. 使用Androidkiller编译APK文件时出现libpng error: Not a PNG file的错误
  4. (转) 基于MapReduce的ItemBase推荐算法的共现矩阵实现(一)
  5. python中的递归函数是什么_Python中的递归函数是什么
  6. java应用程序如何编译运作_开发Java应用程序的基本步骤是: 1 编写源文件, 2.编译源文件, 3.运行程序。_学小易找答案...
  7. Vue3:集成wangEditor富文本编辑器
  8. matlab查表svpwm,SVPWM的MATLAB仿真实现
  9. VS2012安装部署教程
  10. SysLoad3.exe木马病毒地分析及清除方法
  11. 傅里叶分析斯坦恩中文版pdf_傅里叶分析
  12. 【LeetCode】75. 颜色分类,使得相同颜色的元素相邻
  13. android 模拟器 pubg,雷电安卓模拟器怎么玩绝地求生刺激战场 PC端带你愉快吃鸡...
  14. 创新抗生素获FDA优先审评资格 治疗社区获得性肺炎
  15. Python办公自动化word 中插入图片和表格
  16. Tomcat启动异常:A child container failed during start 与 ClassNotFoundException解决方法
  17. 关于java多态性之父类引用指向子类对象
  18. 【若依】开源框架学习笔记 07 - 登录认证流程(Spring Security 源码)
  19. GBase 8s灾备集群HAC (一) 概述
  20. [附源码]计算机毕业设计springboot美发店会员管理系统

热门文章

  1. spring boot项目开发中遇到问题,持续更新
  2. 从0开始的Python学习009参数
  3. 用php+ajax+echarts.js 实现统计每分钟答题曲线图
  4. Java 的抽象特性:抽象类与接口深度解析
  5. 我的Android进阶之旅------gt;解决Jackson等第三方转换Json的开发包在开启混淆后转换的实体类数据都是null的bug...
  6. DataTabel中关于ImpotRow的一点尝试
  7. 基于Solr DIH实现MySQL表数据全量索引和增量索引
  8. 第 8 章:管理模式对象
  9. eth显卡算力2020最新排行_最新三大主流币IPFS比特币ETH挖矿全网算力动态速递单周报(12.3更新)...
  10. mpandroidchart y轴从0开始_从零开始学Pytorch(十七)之目标检测基础