Spark 3.2.0 版本新特性 push-based shuffle 论文详解(一)概要和介绍
前言
本文隶属于专栏《大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!
本专栏目录结构和参考文献请见大数据技术体系
目录
Spark 3.2.0 版本新特性 push-based shuffle 论文详解(一)概要和介绍
Spark 3.2.0 版本新特性 push-based shuffle 论文详解(二)背景和动机
Spark 3.2.0 版本新特性 push-based shuffle 论文详解(三)系统设计
Spark 3.2.0 版本新特性 push-based shuffle 论文详解(四)实现优化
Spark 3.2.0 版本新特性 push-based shuffle 论文详解(五)评估结果
Spark 3.2.0 版本新特性 push-based shuffle 论文详解(六)相关工作
Spark 3.2.0 版本新特性 push-based shuffle 论文详解(七)结论
思维导图
论文下载地址
SPIP: Support push-based shuffle to improve shuffle efficiency
概要
在过去十年中,Apache Spark 已成为大规模数据处理的流行计算引擎。
类似于基于 MapReduce 计算模式的其他计算引擎,Shuffle 操作,即中间数据的全量传输,在 Spark 中起重要作用。
在 LinkedIn,随着数据尺寸和Spark 部署的规模的快速增长,Shuffle 操作正在成为进一步扩展基础设施的瓶颈。
这导致了整体作业缓慢,甚至是长期运行工作的失败。
因为解决此类缓慢和故障问题不仅会影响到开发人员的生产力,而且还导致基础设施的高运营成本。
在这项工作中,我们描述了影响 Shuffle 可伸缩性的主要瓶颈。
我们提出 Magnet
,一种新型的 Shuffle
机制,可以灵活的处理 PB 级别的日常 Shuffle 数据和几千个节点的集群。
Magnet
旨在和 on-prem
和cloud-based
的集群部署一起使用。
On-premises
(on-prem)指传统的部署模型,软件运行在客户的环境中,通常在专用 VPC(虚拟机) 内。特别是,服务存储或处理的所有数据都保留在客户的云中。
cloud-based
是一个术语,指的是云计算提供商的服务器通过互联网按需向用户提供的应用程序、服务或资源。公司通常使用基于云的计算来增加容量、增强功能或按需添加额外服务,而无需承诺潜在的昂贵基础设施成本或增加/培训现有的内部支持人员。
它通过将碎片化的Shuffle 中间数据合并到大的 Block 块中,从而找到了关键性的影响 Shuffle 可伸缩性的瓶颈。
Magnet
通过协同 Reduce 任务的合并块来提供进一步的改进。
我们的基准测试表明,Magnet
显着提高了独立于底层硬件的 Shuffle 性能。
Magnet
减少了 LinkedIn 生产环境端到端的运行时间接近30%。
此外,Magnet
通过删除来自用户的 Shuffle 相关的调优负担来提高用户的生产力。
介绍
分布式数据处理框架(如 Hadoop 和 Spark)过去十年在大规模数据分析用例中得到了普及。
基于 MapReduce 计算模式,利用大规模的商业机器,这些分布式数据处理框架表现出了良好的伸缩性和广泛性以适用于不同的用例,从数据分析到机器学习和 AI。
在最近几年,一系列现代化计算引擎,如 Spark SQL,Presto 和 Flink ,诞生并成为了主流。
Spark SQL 请参考我的博客——Spark SQL 是什么?
Presto 请参考我的博客——一篇文章了解 Presto
Flink 请参考我的博客——Flink 是什么?有什么特点?
与 Hadoop MapReduce 不同,在传给 DAG(有向无环图)执行引擎以执行优化的操作之前,这些现代化计算引擎利用 SQL 优化器来优化用户指定的计算逻辑。
Hadoop MapReduce 的工作流程请参考我的博客——图文详解 MapReduce 工作流程
以 Spark 为例(图1)。
图 1:计算的逻辑计划和一次 Spark 查询中相关联的 Stage DAG
假设用户想要在基于确定的条件过滤掉 join 的结果之前,在表 job_post_view
和表 job_dimension
之间 inner join
。
在这个例子中,前表包含哪个成员看到了哪个作业传到了 LinkedIn 平台的追踪信息。
后者包含每个作业传递的详细信息。
Spark 通过在 join 操作之前下推过滤条件来优化这个查询(图1(a))。
然后,Spark 的 DAG 执行引擎采用这个优化过的计算计划,并将其转换为一到多个作业。
每个作业包含多个 Stage 的 DAG,代表一个血缘关系,表示怎样转换数据来产生当前作业的最终结果(图1(b))。
Stages 之间的中间数据通过 Shuffle 操作来传输。
中间数据通过对应 Stages 的 Map Task 和 Reduce Task 之间的全量连接进行传输,Shuffle 操作是 MapReduce 计算模式的关键所在。
虽然 Shuffle 操作的基本概念直截了当,但不同的框架采取了不同的方法来实现它。
一些框架 如Presto
和 Flink Streaming
在内存中物化了Shuffle 中间数据,以实现低延迟的需求,而其他的比如 Spark
和 Flink Batch
将其物化到本地磁盘上以获得更好的容错性。
在磁盘上物化Shuffle 中间数据时,一种解决方案是基于哈希,每个 Map Task 为每个 Reduce Task 产生单独的文件,另一种解决方案是基于排序,Map Task 的输出通过分区键的 Hash 值排序,物化成单独的文件。
虽然基于排序的 Shuffle 会引发排序的开销,但是当 Shuffle 中间数据过大时,它会是一个更加性能和可扩展的解决方案。
Spark 和 Flink 等框架采用了外部的 Shuffle 服务用来物化 Shuffle 中间数据,以实现更好的容错性和隔离性。
随着近来网络和存储硬件的改进,某些解决方案将 Shuffle 中间数据进行了disaggregated storage
而不是本地存储。
disaggregated storage
是计算机数据中心内的一种数据存储模式。
- 它允许将计算机服务器中的计算资源与存储资源分开,而无需修改任何物理连接。
- 它是一种可组合的分类基础设施形式,允许通过网络结构连接资源,在升级、替换或添加单个资源时提供灵活性。
- 它还允许为未来的增长构建服务器,在不影响吞吐量和延迟的情况下,提供比传统数据存储更高的存储效率、规模和性能。
- 它是一种扩展存储形式,由一些存储设备构建,这些存储设备可以作为一个逻辑存储池,可以通过非常高性能的网络结构分配给网络上的任何服务器。
- 它解决了存储区域网络或直接连接存储的局限性。
- 它是动态可重新配置的,并优化重新配置物理资源,以最大限度地提高性能并限制延迟。
- 它具有存储区域网络的灵活性,可提供本地存储的性能。
其他解决方案则跳过了物化中间数据,通过将 Map Task 的输出直接推送给 Reduce Task 来实现低延迟。
Shuffle 实现上的差异为这些计算引擎中的 Shuffle 操作提供了丰富的优化空间。
事实上,改进 Spark 中 Shuffle 的关键在于赢得 Sort Benchmark。
Apache spark the fastest open source engine for sorting a petabyte.
在 LinkedIn,作为全球专业的社交网络公司,我们数千个节点 的 Spark 生产集群上每天运行着大量的批量分析和机器学习工作。
这就导致每天有 PB 级别的数据要被 Spark Shuffle。
当处理如此庞大的数据时,Shffle 操作就对 Spark 基础设施建设变得至关重要。
Spark 以基于排序的方式物化 Shuffle 数据到磁盘上,并提供外部 Shuffle 服务。
虽然这在容错性和性能之间达到了良好的平衡,但 LinkedIn 内部 Spark 工作负载的快速增长仍然造成了很多挑战。
首先,建立全量连接从 Map Task 到 Reduce Task 传输数据,这个需求正在面临着可靠性问题。
在具有数千个节点的集群中,每个节点间歇性的可用性问题相当的常见。
在高峰时段,增加的 Shuffle 工作负载也会加重部署的 Shuffle 服务,使得连接失败的可能性增加。
其次,Shuffle 期间的磁盘 I/O 操作也出现了效率问题。
将物化的 Spark Shuffle 数据切分成小的 Shuffle 块,这些块会以随机的顺序被独立的获取。
这些块通常都很小。
LinkedIn 的 Spark 集群中的平均块大小只有 10 KB。
每天在我们的集群上面都会读取数十亿这样的块,如果使用的是机械磁盘(HDD)的话,会对磁盘造成很大的负担。
细小的随机磁盘读取,结合其他的类似网络 I/O 的开销,会导致获取 Shuffle 数据的延迟增加。
我们集群中 Spark 总体的计算资源大概有 15% 都浪费在了这种延迟上面。
最后,随着 Shuffle 数据大小的增长,平均块大小的减少还引入了可伸缩性问题。
随着我们的 Spark 工作负载趋向于处理更多的数据,这种效率问题逐渐变得更糟糕。
一些虽然有很多小块但是却很少配置的 Spark 应用进一步的加剧了这个问题。
虽然使用固态磁盘(SSD)来存储 Shuffle 数据,这种解决方案可以帮助缓解这些问题,但是在 LinkedIn 的集群规模上面使用固态磁盘来替换机械磁盘并不实用。
有关这一点的更多细节会在第 2.2 节讨论。
此外,由于网络的开销,利用disaggregated storage
的cloud-based
的集群部署,也深受小规模读取的困扰。
为了解决这些挑战,我们提出了一个名为Magnet
的可选 Shuffle 机制。
通过Magnet
,我们将每个 Map Task 产生的碎片化的 Shuffle 块推向远程的 Shuffle 服务,并适时地将它们合入每个 Shuffle 分区的大块中。
这会有下面的几点好处:
Magnet
适时地将碎片化的 Shuffle 中间数据合并成大块,并将其与 Reduce Task 协作。这使得Magnet
显著地提升了 Shuffle 操作的效率,并且独立于底层存储硬件,减少了作业端到端的运行时间。Magnet
采用混合方法,其中合并和未合并的 Shuffle 数据都可以用作 Reduce Task 的输入。这有助于在磁盘 Shuffle 期间提高可靠性。Magnet
旨在适用于on-prem
或者cloud-based
的部署,并且可以扩展到每天处理 PB 级别的 Shuffle 中间数据,其中包含数千个节点。
本文的其余部分组织如下:
第2节介绍了 Spark Shuffle 操作并讨论存在的问题。
第3节介绍了Magnet
的详细设计。
第4节介绍了Magnet
实现中采用的一些优化。
第5节给出了评估设置,关键结果和分析。
第6节关于相关工作的讨论。
我们第7节给本文做了总结。
Spark 3.2.0 版本新特性 push-based shuffle 论文详解(一)概要和介绍相关推荐
- Spark 3.2.0 版本新特性 push-based shuffle 论文详解(二)背景和动机
前言 本文隶属于专栏<大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见大数据技术体系 目录 Spark 3.2.0 ...
- pytorch gather_【Pytorch】Pytorch-1.1.0 版本新特性
2019年05月01日,Pytorch 1.1.0 版本正式发布啦~https://github.com/pytorch/pytorch/releases/tag/v1.1.0 主要的几个功能: 1. ...
- java8新特性(四)_Stream详解
之前写过一篇用stream处理map的文章,但是对stream没有一个整体的认识,这次结合并发编程网和ibm中介绍stream的文章进行一个总结,我会着重写对list的处理,毕竟实际工作中大家每天进行 ...
- C++11、C++14、C++17、C++20新特性总结(5万字详解)
文章目录 C++ 11是什么,C++ 11标准的由来 C++ auto类型推导完全攻略 auto 类型推导的语法和规则 auto 的高级用法 auto 的限制 auto 的应用 使用 auto 定义迭 ...
- 【Stream流学习】Java 8 新特性|Collectors.joining() 案例详解
[辰兮要努力]:hello你好我是辰兮,很高兴你能来阅读,昵称是希望自己能不断精进,向着优秀程序员前行! 博客来源于项目以及编程中遇到的问题总结,偶尔会有读书分享,我会陆续更新Java前端.后台.数据 ...
- Android Q(10.0)版本新特性以及兼容性适配
北京时间2019年3月14日Google正式对外发布Android Q Beta 1及预览版SDK,这意味着安卓开发者们又即将迎来一年一度的新版本适配工作了.Android Q 为开发者们带来了许多新 ...
- Java新特性之Nashorn的实例详解
Nashorn是什么 Nashorn,发音"nass-horn",是德国二战时一个坦克的命名,同时也是java8新一代的javascript引擎--替代老旧,缓慢的Rhino,符合 ...
- 19c 新特性: Hint Usage Reports详解
老张拉呱:thomas zhang,甲骨文云平台事业部资深技术顾问,2008年加入甲骨文公司数据库咨询部门,10+年甲骨文解决方案咨询支持经验,资深系统工程师.Oracle OCM认证专家,具有丰富的 ...
- mysql按升序创建索引_MySQL8新特性:降序索引详解
前言 MySQL 8.0终于支持降序索引了.其实,从语法上,MySQL 4就支持了,但正如官方文档所言,"they are parsed but ignored",实际创建的还是升 ...
最新文章
- 如何设计可自学习的五子棋AI?
- C言语for轮回语句
- 数字证书及在WCF中的应用
- Tomcat已经启动
- ssh-add ssh-agent使用
- base64文件上传后台处理
- Redux Vuex
- kylin与superset集成实现数据可视化
- javaSE学习 访问控制
- 动图:七分钟帮你理解什么是KMP算法?
- 华为主题引擎怎么下载_华为搜索引擎app
- 蚂蚁区块链BaaS平台应用开发指南(五):JS SDK的接入
- 大环境之下软件测试行业趋势能否上升?
- 电脑计算机在哪里关闭杀毒模式,电脑一开杀毒软件就关闭的解决方法
- 计算机科学计算矩阵答案,计算机科学计算答案.doc
- 有么有免费的logo生成器?大家看中的是哪款?
- 推荐一个rss源搜索引擎
- Unit Test Harness(用具)应该具备什么功能?
- 海盗王客户端断线自动重连的解决方案
- arcgis按要求删除点位
热门文章
- 前沿 | 中国中小银行都是如何展开数字化转型的?
- centos使用yum命令安装报错 File /usr/bin/yum, line 30 except KeyboardInterrupt, e:
- 计算机带e的科学计数法,带e的科学计数法
- [TJOI2017]可乐
- 企业想变换ISO9001发证机构,需要怎么操作?
- 已知铁的密度是7.86g/cm3,从键盘输入铁球的直径(单位:cm),计算出铁球的表面积和质量,并输出(保留两位小数)(使用标准库math模块得到pi的值)。...
- 音频采样及编解码——LPCM 、ADPCM、G711、G726
- POSTGIS路径规划的简单配置(数据库配置)
- (四十三)参数法与非参数法(历史模拟法)计算VaR
- Docker专题(二)-Docker安装与启动