文章目录

  • 一、Spark Shuffle
    • 1. HashShuffleManager
    • 2. SortShuffleManager
    • 3. 总结
  • 二、Spark3.0新特性
    • 1. Adaptive Query Execution 自适应查询(SparkSQL)
    • 2. Dynamic Partition Pruning 动态分区裁剪(SparkSQL)
    • 3. 增强的Python API: PySpark和Koalas
      • 3.1 Koalas DataFrame构建
      • 3.2 Koalas DataFrame 查看数据
      • 3.3 Koalas DataFrame 缺失值处理
      • 3.4 Koalas DataFrame 分组计算
      • 3.5 Koalas DataFrame 数据导入导出
  • 三、Spark核心概念

传送门:

  • 视频地址:黑马程序员Spark全套视频教程
  • 1.PySpark基础入门(一)
  • 2.PySpark基础入门(二)
  • 3.PySpark核心编程(一)
  • 4.PySpark核心编程(二)
  • 5.PySaprk——SparkSQL学习(一)
  • 6.PySaprk——SparkSQL学习(二)
  • 7.Spark综合案例——零售业务统计分析
  • 8. Spark3新特性及核心概念(背)

一、Spark Shuffle

  在Shuffle过程中,提供数据的称之为Map端(Shuffle Write) 。接收数据的 称之为 Reduce端(Shuffle Read)。在Spark的两个阶段中,总是前一个阶段产生一批Map提供数据,下一阶段产生一批Reduce接收数据。
  Spark 提供2种Shuffle管理器:

  • HashShuffleManager:基于Hash分组进行Shuffle
  • SortShuffleManager:基于排序进行Shuffle

SortShuffleManager可以看做是HashShuffleManager的增强,在Shuffle阶段中,以最少的文件的产出来完成数据的相互交流。

1. HashShuffleManager


基本和未优化的一致,不同点在于:

  1. 在一个Executor内, 不同Task是共享Buffer缓冲区
  2. 这样减少了缓冲区乃至写入磁盘文件的数量, 提高性能

2. SortShuffleManager

  SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。


bypass运行机制的触发条件如下:

  1. shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold=200参数的值。
  2. 不是聚合类的shuffle算子(比如reduceByKey)。

同普通机制基本类同,区别在于,写入磁盘临时文件的时候不会在内存中进行排序,而是直接写,最终合并为一个task一个最终文件。所以和普通模式IDE区别在于:

  1. 磁盘写机制不同;
  2. 不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

3. 总结

  1. SortShuffle对比HashShuffle可以减少很多的磁盘文件,以节省网络IO的开销
  2. SortShuffle主要是对磁盘文件进行合并来进行文件数量的减少,同时两类Shuffle都需要经过内存缓冲区溢写磁盘的场景。所以可以得知,尽管Spark是内存迭代计算框架,但是内存迭代主要在窄依赖中。 在宽依赖(Shuffle)中磁盘交互还是一个无可避免的情况。 所以,我们要尽量减少Shuffle的出现,不要进行无意义的Shuffle计算。

二、Spark3.0新特性

1. Adaptive Query Execution 自适应查询(SparkSQL)

  由于缺乏或者不准确的数据统计信息(元数据)和对成本的错误估算(执行计划调度)导致生成的初始执行计划不理想。在Spark3.x版本提供Adaptive Query Execution自适应查询技术,通过在”运行时”对查询执行计划进行优化,允许Planner在运行时执行可选计划,这些可选计划将会基于运行时数据统计进行动态优化,从而提高性能。
Adaptive Query Execution(AQE)主要提供了三个自适应优化:

  • 动态合并 Shuffle Partitions
    可以动态调整shuffle分区的数量。用户可以在开始时设置相对较多的shuffle分区数,AQE会在运行时将相邻的小分区合并为较大的分区。
  • 动态调整Join策略
    此优化可以在一定程度上避免由于缺少统计信息或者错误估计大小(当然也可能两种情况同时存在),而导致执行计划性能不佳的情况。这种自适应优化可以在运行时sort merge join转换成broadcast hash join,从而进一步提升性能。
  • 动态优化倾斜Join(Skew Joins)
    skew joins可能导致负载的极端不平衡,并严重降低性能。在AQE从shuffle文件统计信息中检测到任何倾斜后,它可以将倾斜的分区分割成更小的分区,并将它们与另一侧的相应分区连接起来。这种优化可以并行化倾斜处理,获得更好的整体性能。

    触发条件:
  1. 分区大小 > spark.sql.adaptive.skewJoin.skewedPartitionFactor (default=10) * “median partition size(中位数分区大小)”
  2. 分区大小 > spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (default = 256MB)

AQE 总结:

  1. AQE的开启通过: spark.sql.adaptive.enabled 设置为true开启
  2. AQE是自动化优化机制, 无需我们设置复杂的参数调整, 开启AQE符合条件即可自动化应用AQE优化
  3. AQE带来了极大的SparkSQL性能提升

2. Dynamic Partition Pruning 动态分区裁剪(SparkSQL)

  当优化器在编译时无法识别可跳过的分区时,可以使用"动态分区裁剪",即基于运行时推断的信息来进一步进行分区裁剪。这在星型模型中很常见,星型模型是由一个或多个并且引用了任意数量的维度表的事实表组成。在这种连接操作中,我们可以通过识别维度表过滤之后的分区来裁剪从事实表中读取的分区。在一个TPC-DS基准测试中,102个查
询中有60个查询获得2到18倍的速度提升。

3. 增强的Python API: PySpark和Koalas

  下面是Koalas的入门演示。

3.1 Koalas DataFrame构建

>>> import pandas as pd
>>> import numpy as np
>>> import dataabricks.koalas as ks# 构建Pandas的DatetimeIndex
>>> dates = pd.date_range('20130101', periods=6)
>>> dates
DatetimeIndex(['2013-01-01', '2013-01-02', '2013-01-03', '2013-01-04','2013-01-05', '2013-01-06'],dtype='datetime64[ns]', freq='D')# 构建Pandas的DataFrame
>>> pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))
>>> pdfA         B         C         D
2013-01-01 -1.637405  1.037654 -0.144016  0.295328
2013-01-02 -0.259776  2.002485 -0.384515  0.053215
2013-01-03 -0.542287  0.083243 -0.845770  0.670884
2013-01-04  1.181680  1.006979  0.237547  0.529390
2013-01-05 -1.072508  0.330510 -1.154201 -0.798589
2013-01-06 -0.016008  0.553928 -0.660023  0.640660# 基于PDF构建Koalas DataFrame
>>> kdf = ks.from_pandas(pdf)
>>> kdfA         B         C         D
2013-01-01 -1.637405  1.037654 -0.144016  0.295328
2013-01-02 -0.259776  2.002485 -0.384515  0.053215
2013-01-03 -0.542287  0.083243 -0.845770  0.670884
2013-01-04  1.181680  1.006979  0.237547  0.529390
2013-01-05 -1.072508  0.330510 -1.154201 -0.798589
2013-01-06 -0.016008  0.553928 -0.660023  0.640660
>>> type(kdf)
<class 'databricks.koalas.frame.DataFrame'># 基于SparkSession构建
# 先转换PandasDataFrame成SparkDataFrame
>>> sdf = spark.createDataFrame(pdf)# 转换SparkDataFrame到KoalasDataFrame
>>> kdf = sdf.to_koalas()
>>> kdf
22/06/29 16:41:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.A         B         C         D
0 -1.637405  1.037654 -0.144016  0.295328
1 -0.259776  2.002485 -0.384515  0.053215
2 -0.542287  0.083243 -0.845770  0.670884
3  1.181680  1.006979  0.237547  0.529390
4 -1.072508  0.330510 -1.154201 -0.798589
5 -0.016008  0.553928 -0.660023  0.640660# 或者直接创建kdf也可以
>>> kdf = ks.DataFrame({'A': ['foo', 'bar', 'foo', 'bar',
... 'foo', 'bar', 'foo', 'foo'],
... 'B': ['one', 'one', 'two', 'three',
... 'two', 'two', 'one', 'three'],
... 'C': np.random.randn(8),
... 'D': np.random.randn(8)})
>>> kdfA      B         C         D
0  foo    one  0.262542  1.335999
1  bar    one -0.173848 -0.403173
2  foo    two  0.399890 -2.411173
3  bar  three -0.052464  1.500888
4  foo    two -1.417023 -0.133239
5  bar    two -0.980970  0.757272
6  foo    one  0.924974 -0.378872
7  foo  three  0.158485  0.809543

3.2 Koalas DataFrame 查看数据

>>> kdfA      B         C         D
0  foo    one  0.262542  1.335999
1  bar    one -0.173848 -0.403173
2  foo    two  0.399890 -2.411173
3  bar  three -0.052464  1.500888
4  foo    two -1.417023 -0.133239
5  bar    two -0.980970  0.757272
6  foo    one  0.924974 -0.378872
7  foo  three  0.158485  0.809543>>> kdf.head()A      B         C         D
0  foo    one  0.262542  1.335999
1  bar    one -0.173848 -0.403173
2  foo    two  0.399890 -2.411173
3  bar  three -0.052464  1.500888
4  foo    two -1.417023 -0.133239>>> kdf.index
Int64Index([0, 1, 2, 3, 4, 5, 6, 7], dtype='int64')>>> kdf.columns
Index(['A', 'B', 'C', 'D'], dtype='object')>>> kdf.to_numpy()
array([['foo', 'one', 0.26254175968455545, 1.3359991506379416],['bar', 'one', -0.17384830238935162, -0.40317287734535234],['foo', 'two', 0.39988960259487943, -2.4111732433239337],['bar', 'three', -0.052464404153064, 1.500888245546042],['foo', 'two', -1.417022830092877, -0.13323948132553412],['bar', 'two', -0.9809699243901808, 0.7572717915953346],['foo', 'one', 0.9249735193346398, -0.3788721775239115],['foo', 'three', 0.15848531319616643, 0.8095431323211872]],dtype=object)>>> kdf.describe()C         D
count  8.000000  8.000000
mean  -0.109802  0.134656
std    0.757469  1.268234
min   -1.417023 -2.411173
25%   -0.980970 -0.403173
50%   -0.052464 -0.133239
75%    0.262542  0.809543
max    0.924974  1.500888>>> kdf.sort_index(ascending=False)A      B         C         D
7  foo  three  0.158485  0.809543
6  foo    one  0.924974 -0.378872
5  bar    two -0.980970  0.757272
4  foo    two -1.417023 -0.133239
3  bar  three -0.052464  1.500888
2  foo    two  0.399890 -2.411173
1  bar    one -0.173848 -0.403173
0  foo    one  0.262542  1.335999>>> kdf.sort_values(by='B')A      B         C         D
0  foo    one  0.262542  1.335999
1  bar    one -0.173848 -0.403173
6  foo    one  0.924974 -0.378872
3  bar  three -0.052464  1.500888
7  foo  three  0.158485  0.809543
2  foo    two  0.399890 -2.411173
4  foo    two -1.417023 -0.133239
5  bar    two -0.980970  0.757272

3.3 Koalas DataFrame 缺失值处理

3.4 Koalas DataFrame 分组计算

3.5 Koalas DataFrame 数据导入导出



三、Spark核心概念

PySpark | Spark3新特性 | Spark核心概念相关推荐

  1. Spark学习之路---Spark核心概念

    Spark核心概念简介 一个Spark应用都由驱动器程序发起集群上的各种并发操作,一个驱动器程序一般包含多个执行器节点,驱动器程序通过一个SaprkContext对象访问saprk.RDD(弹性分布式 ...

  2. lsdyna如何设置set中的node_list_如何快速掌握es6+新特性及核心语法?

    国庆刚刚结束,我们开始一波新的学习进程吧. ECMAScript 6.0(以下简称ES6)是JavaScript语言的下一代标准,已经在2015年6月正式发布了.作为前端必备技能,我们来快速开始吧 接 ...

  3. 一文快速掌握 es6+新特性及核心语法

    首先先祝各位节日快乐,好好去体验生活的快乐,在假期最后一天里,祝大家收获满满,同时抓住假期的尾巴,收割实用技能. 接下来我会总结一些工作中常用也比较核心的es6+的语法知识,后面又要慢慢开始工作之旅了 ...

  4. Spark入门-了解Spark核心概念

    在本文中我们将从Spark集群角度和程序应用的角度来对相关概念进行了解 一. 站在集群角度 1.1 Master Spark 特有资源调度系统的 Leader.掌管着整个集群的资源信息,类似于 Yar ...

  5. Spark1.0新特性--Spark SQL

    Spark1.0出来了,变化还是挺大的,文档比以前齐全了,RDD支持的操作比以前多了一些,Spark on yarn功能我居然跑通了.但是最最重要的就是多了一个Spark SQL的功能,它能对RDD进 ...

  6. PySpark | RDD持久化 | 共享变量 | Spark内核调度

    文章目录 一.RDD持久化 1.RDD的数据是过程数据 2.RDD缓存 2.1 RDD缓存的特点 2.2 cache()与unpersist()实战 3.RDD CheckPoint 3.1 Chec ...

  7. html5,css3,es6新特性总结

    html5新特性 语义化标签 如:header,footer,nav,dialog 增强型表单 如:date,week,url,time,email,month 视频和音频 audio和video C ...

  8. 玩转ECS第8讲 | 服务器迁移中心SMC最佳实践及新特性介绍

    简介:本次分享由阿里云技术专家白辉万(百宝)为大家介绍免费的服务器迁移上云最佳实践方案和新功能特性,包括一键迁云.自动定期同步.一键验证.本次分享内容将帮助企业上云客户越过高高的服务器迁移门槛,快速体 ...

  9. new php,PHP: 新特性 - Manual

    新特性 PHP 核心中的新特性 属性添加限定类型 类的属性中现在支持添加指定的类型. publicint $id; publicstring $name; }?> 上面的例子中,会强制要求 $u ...

最新文章

  1. C语言实现bmp图像锐化
  2. 小程序获取用户所在城市完整代码
  3. 100行代码搞定实时视频人脸表情识别(附代码)
  4. easyui_动态添加隐藏toolbar按钮
  5. Vista下的Asp.net Mvc安装
  6. 嵌入式Linux C笔试题积累(转)
  7. python主线程执行_Python队列 – 最多运行n个线程
  8. class路径快捷 xml配置_SpringBoot 配置文件详解(告别XML)-class文件
  9. C#树状图 初始默认选中节点
  10. php是什么电器元件,电阻器是电子、电器设备中常使用的一种基本电子元件
  11. Linux实战教学笔记29:MySQL数据库企业级应用实践
  12. 【解决】linux磁盘扩容大全:新增磁盘、原磁盘扩容、home分区root分区扩容
  13. 计算机为动态分区无法安装系统,磁盘动态分区形式的电脑怎么重装系统win10
  14. 3脚送话器内部电路图_教你看电路图(全)(20个经典实例)..
  15. 华为AC外置Portal认证方案配置步骤指南
  16. 测试人员与开发人员的比例究竟多少是合理的?
  17. python怎么变成动图_python可以做动图吗
  18. 我的世界手机有PHP的开服器下载,【BDS】MC基岩版官方开服器Windows版插件开发包...
  19. springboot+redistemplate 集群配置
  20. Android 笔记 沉浸式状态栏设置及效果说明

热门文章

  1. svn执行update操作后出现:Error : Previous operation has not finished; run 'cleanup' if it was interrupted.
  2. 如何用教科书式的方法,着手分析一个行业?
  3. 阿克曼转速度指令 cmd_vel
  4. 质量保证QA与质量控制QC
  5. EXCEL文本函数-数字小写换大写-英文大小写互换
  6. ThingWorx入门
  7. 修复windows修复引导_如何使用安全模式修复Windows PC(以及何时应该修复)
  8. java framemaker教程_Freemarker入门案例
  9. google authenticator python_谷歌验证器(Google Authenticator)
  10. apollo 阿波罗使用