阿里云EMR自2020年推出Remote Shuffle Service(RSS)以来,帮助了诸多客户解决Spark作业的性能、稳定性问题,并使得存算分离架构得以实施,与此同时RSS也在跟合作方小米的共建下不断演进。本文将介绍RSS的最新架构,在小米的实践,以及开源。

一 问题回顾

Shuffle是大数据计算中最为重要的算子。首先,覆盖率高,超过50%的作业都包含至少一个Shuffle[2]。其次,资源消耗大,阿里内部平台Shuffle的CPU占比超过20%,LinkedIn内部Shuffle Read导致的资源浪费高达15%[1],单Shuffle数据量超100T[2]。第三,不稳定,硬件资源的稳定性CPU>内存>磁盘≈网络,而Shuffle的资源消耗是倒序。OutOfMemory和Fetch Failure可能是Spark作业最常见的两种错误,前者可以通过调参解决,而后者需要系统性重构Shuffle。

传统Shuffle如下图所示,Mapper把Shuffle数据按PartitionId排序写盘后交给External Shuffle Service(ESS)管理,Reducer从每个Mapper Output中读取属于自己的Block。

传统Shuffle存在以下问题。

  • 本地盘依赖限制了存算分离。存算分离是近年来兴起的新型架构,它解耦了计算和存储,可以更灵活地做机型设计:计算节点强CPU弱磁盘,存储节点强磁盘强网络弱CPU。计算节点无状态,可根据负载弹性伸缩。存储端,随着对象存储(OSS, S3)+数据湖格式(Delta, Iceberg, Hudi)+本地/近地缓存等方案的成熟,可当作容量无限的存储服务。用户通过计算弹性+存储按量付费获得成本节约。然而,Shuffle对本地盘的依赖限制了存算分离。
  • 写放大。当Mapper Output数据量超过内存时触发外排,从而引入额外磁盘IO。
  • 大量随机读。Mapper Output属于某个Reducer的数据量很小,如Output 128M,Reducer并发2000,则每个Reducer只读64K,从而导致大量小粒度随机读。对于HDD,随机读性能极差;对于SSD,会快速消耗SSD寿命。
  • 高网络连接数,导致线程池消耗过多CPU,带来性能和稳定性问题。
  • Shuffle数据单副本,大规模集群场景坏盘/坏节点很普遍,Shuffle数据丢失引发的Stage重算带来性能和稳定性问题。

二 RSS发展历程

针对Shuffle的问题,工业界尝试了各种方法,近两年逐渐收敛到Push Shuffle的方案。

1 Sailfish

Sailfish3最早提出Push Shuffle + Partition数据聚合的方法,对大作业有20%-5倍的性能提升。Sailfish魔改了分布式文件系统KFS[4],不支持多副本。

2 Dataflow

Goolge BigQuery和Cloud Dataflow5实现了Shuffle跟计算的解耦,采用多层存储(内存+磁盘),除此之外没有披露更多技术细节。

3 Riffle

Facebook Riffle2采用了在Mapper端Merge的方法,物理节点上部署的Riffle服务负责把此节点上的Shuffle数据按照PartitionId做Merge,从而一定程度把小粒度的随机读合并成较大粒度。

4 Cosco

Facebook Cosco[6]7采用了Sailfish的方法并做了重设计,保留了Push Shuffle + Parititon数据聚合的核心方法,但使用了独立服务。服务端采用Master-Worker架构,使用内存两副本,用DFS做持久化。Cosco基本上定义了RSS的标准架构,但受到DFS的拖累,性能上并没有显著提升。

5 Zeus

Uber Zeus[8]9同样采用了去中心化的服务架构,但没有类似etcd的角色维护Worker状态,因此难以做状态管理。Zeus通过Client双推的方式做多副本,采用本地存储。

6 RPMP

Intel RPMP10依靠RDMA和PMEM的新硬件来加速Shuffle,并没有做数据聚合。

7 Magnet

LinkedIn Magnet1融合了本地Shuffle+Push Shuffle,其设计哲学是"尽力而为",Mapper的Output写完本地后,Push线程会把数据推给远端的ESS做聚合,且不保证所有数据都会聚合。受益于本地Shuffle,Magnet在容错和AE的支持上的表现更好(直接Fallback到传统Shuffle)。Magnet的局限包括依赖本地盘,不支持存算分离;数据合并依赖ESS,对NodeManager造成额外压力;Shuffle Write同时写本地和远端,性能达不到最优。Magnet方案已经被Apache Spark接纳,成为默认的开源方案。

8 FireStorm

FireStorm11混合了Cosco和Zeus的设计,服务端采用Master-Worker架构,通过Client多写实现多副本。FireStorm使用了本地盘+对象存储的多层存储,采用较大的PushBlock(默认3M)。FireStorm在存储端保留了PushBlock的元信息,并记录在索引文件中。FireStorm的Client缓存数据的内存由Spark MemoryManager进行管理,并通过细颗粒度的内存分配(默认3K)来尽量避免内存浪费。

从上述描述可知,当前的方案基本收敛到Push Shuffle,但在一些关键设计上的选择各家不尽相同,主要体现在:

  1. 集成到Spark内部还是独立服务。
  2. RSS服务侧架构,选项包括:Master-Worker,含轻量级状态管理的去中心化,完全去中心化。
  3. Shuffle数据的存储,选项包括:内存,本地盘,DFS,对象存储。
  4. 多副本的实现,选项包括:Client多推,服务端做Replication。

阿里云RSS12由2020年推出,核心设计参考了Sailfish和Cosco,并且在架构和实现层面做了改良,下文将详细介绍。

三 阿里云RSS核心架构

针对上一节的关键设计,阿里云RSS的选择如下:

  1. 独立服务。考虑到将RSS集成到Spark内部无法满足存算分离架构,阿里云RSS将作为独立服务提供Shuffle服务。
  2. Master-Worker架构。通过Master节点做服务状态管理非常必要,基于etcd的状态状态管理能力受限。
  3. 多种存储方式。目前支持本地盘/DFS等存储方式,主打本地盘,将来会往分层存储方向发展。
  4. 服务端做Replication。Client多推会额外消耗计算节点的网络和计算资源,在独立部署或者服务化的场景下对计算集群不友好。

下图展示了阿里云RSS的关键架构,包含Client(RSS Client, Meta Service),Master(Resource Manager)和Worker三个角色。Shuffle的过程如下:

  1. Mapper在首次PushData时请求Master分配Worker资源,Worker记录自己所需要服务的Partition列表。
  2. Mapper把Shuffle数据缓存到内存,超过阈值时触发Push。
  3. 隶属同个Partition的数据被Push到同一个Worker做合并,主Worker内存接收到数据后立即向从Worker发起Replication,数据达成内存两副本后即向Client发送ACK,Flusher后台线程负责刷盘。
  4. Mapper Stage运行结束,MetaService向Worker发起CommitFiles命令,把残留在内存的数据全部刷盘并返回文件列表。
  5. Reducer从对应的文件列表中读取Shuffle数据。

阿里云RSS的核心架构和容错方面的介绍详见[13],本文接下来介绍阿里云RSS近一年的架构演进以及不同于其他系统的特色。

1 状态下沉

RSS采用Master-Worker架构,最初的设计中Master统一负责集群状态管理和Shuffle生命周期管理。集群状态包括Worker的健康度和负载;生命周期包括每个Shuffle由哪些Worker服务,每个Worker所服务的Partition列表,Shuffle所处的状态(Shuffle Write,CommitFile,Shuffle Read),是否有数据丢失等。维护Shuffle生命周期需要较大数据量和复杂数据结构,给Master HA的实现造成阻力。同时大量生命周期管理的服务调用使Master易成为性能瓶颈,限制RSS的扩展性。

为了缓解Master压力,我们把生命周期状态管理下沉到Driver,由Application管理自己的Shuffle,Master只需维护RSS集群本身的状态。这个优化大大降低Master的负载,并使得Master HA得以顺利实现。

2 Adaptive Pusher

在最初的设计中,阿里云RSS跟其他系统一样采用Hash-Based Pusher,即Client会为每个Partition维护一个(或多个[11])内存Buffer,当Buffer超过阈值时触发推送。这种设计在并发度适中的情况下没有问题,而在超大并发度的情况下会导致OOM。例如Reducer的并发5W,在小Buffer[13]的系统中(64K)极端内存消耗为64K5W=3G,在大Buffer[11]的系统中(3M)极端内存消耗为3M5W=146G,这是不可接受的。针对这个问题,我们开发了Sort-Based Pusher,缓存数据时不区分Partition,当总的数据超过阈值(i.e. 64M)时对当前数据按照PartitionId排序,然后把数据Batch后推送,从而解决内存消耗过大的问题。

Sort-Based Pusher会额外引入一次排序,性能上比Hash-Based Pusher略差。我们在ShuffleWriter初始化阶段根据Reducer的并发度自动选择合适的Pusher。

3 磁盘容错

出于性能的考虑,阿里云RSS推荐本地盘存储,因此处理坏/慢盘是保证服务可靠性的前提。Worker节点的DeviceMonitor线程定时对磁盘进行检查,检查项包括IOHang,使用量,读写异常等。此外Worker在所有磁盘操作处(创建文件,刷盘)都会捕捉异常并上报。IOHang、读写异常被认为是Critical Error,磁盘将被隔离并终止该磁盘上的存储服务。慢盘、使用量超警戒线等异常仅将磁盘隔离,不再接受新的Partition存储请求,但已有的Partition服务保持正常。在磁盘被隔离后,Worker的容量和负载将发生变化,这些信息将通过心跳发送给Master。

4 滚动升级

RSS作为常驻服务,有永不停服的要求,而系统本身总在向前演进,因此滚动升级是必选的功能。尽管通过Sub-Cluster部署方式可以绕过,即部署多个子集群,对子集群做灰度,灰度的集群暂停服务,但这种方式依赖调度系统感知正在灰度的集群并动态修改作业配置。我们认为RSS应该把滚动升级闭环掉,核心设计如下图所示。Client向Master节点的Leader角色(Master实现了HA,见上文)发起滚动升级请求并把更新包上传给Leader,Leader通过Raft协议修改状态为滚动升级,并启动第一阶段的升级:升级Master节点。Leader首先升级所有的Follower,然后替换本地包并重启。在Leader节点改变的情况下,升级过程不会中断或异常。Master节点升级结束后进入第二阶段:Worker节点升级。RSS采用滑动窗口做升级,窗口内的Worker尽量优雅下线,即拒绝新的Partition请求,并等待本地Shuffle结束。为了避免等待时间过长,会设置超时时间。此外,窗口内的Worker选择会尽量避免同时包含主从两副本以降低数据丢失的概率。

5 混乱测试框架

对于服务来说,仅依靠UT、集成测试、e2e测试等无法保证服务可靠性,因为这些测试无法覆盖线上复杂环境,如坏盘、CPU过载、网络过载、机器挂掉等。RSS要求在出现这些复杂情况时保持服务稳定,为了模拟线上环境,我们开发了仿真(混乱)测试框架,在测试环境中模拟线上可能出现的异常,同时保证满足RSS运行的最小运行环境,即至少3个Master节点和2个Worker节点可用,并且每个Worker节点至少有一块盘。我们持续对RSS做此类压力测试。

仿真测试框架架构如下图所示,首先定义测试Plan来描述事件类型、事件触发的顺序及持续时间,事件类型包括节点异常,磁盘异常,IO异常,CPU过载等。客户端将Plan提交给Scheduler,Scheduler根据Plan的描述给每个节点的Runner发送具体的Operation,Runner负责具体执行并汇报当前节点的状态。在触发Operation之前,Scheduler会推演该事件发生产生的后果,若导致无法满足RSS的最小可运行环境,将拒绝此事件。

我们认为仿真测试框架的思路是通用设计,可以推广到更多的服务测试中。

6 多引擎支持

Shuffle是通用操作,不跟引擎绑定,因此我们尝试了多引擎支持。当前我们支持了Hive+RSS,同时也在探索跟流计算引擎(Flink),MPP引擎(Presto)结合的可能性。尽管Hive和Spark都是批计算引擎,但Shuffle的行为并不一致,最大的差异是Hive在Mapper端做排序,Reducer只做Merge,而Spark在Reducer端做排序。由于RSS暂未支持计算,因此需要改造Tez支持Reducer排序。此外,Spark有干净的Shuffle插件接口,RSS只需在外围扩展,而Tez没有类似抽象,在这方面也有一定侵入性。

当前大多数引擎都没有Shuffle插件化的抽象,需要一定程度的引擎修改。此外,流计算和MPP都是上游即时Push给下游的模式,而RSS是上游Push,下游Pull的模式,这两者如何结合也是需要探索的。

7 测试

我们对比了阿里云RSS、Magent及开源系统X。由于大家的系统还在向前演进,因此测试结果仅代表当前。

测试环境

Header 1: ecs.g6e.4xlarge, 16 2.5GHz/3.2GHz, 64GiB, 10Gbps
Worker 3: ecs.g6e.8xlarge, 32 2.5GHz/3.2GHz, 128GiB, 10Gbps

阿里云RSS vs. Magnet

5T Terasort的性能测试如下图所示,如上文描述,Magent的Shuffle Write有额外开销,差于RSS和传统做法。Magent的Shuffle Read有提升,但差于RSS。在这个Benchmark下,RSS明显优于另外两个,Magent的e2e时间略好于传统Shuffle。

阿里云RSS vs. 开源系统X

RSS跟开源系统X在TPCDS-3T的性能对比如下,总时间RSS快了20%。

稳定性

在稳定性方面,我们测试了Reducer大规模并发的场景,Magnet可以跑通但时间比RSS慢了数倍,System X在Shuffle Write阶段报错。

四 阿里云RSS在小米的实践

1 现状及痛点

小米的离线集群以Yarn+HDFS为主,NodeManager和DataNode混合部署。Spark是主要的离线引擎,支撑着核心计算任务。Spark作业当前最大的痛点集中在Shuffle导致的稳定性差,性能差和对存算分离架构的限制。在进行资源保证和作业调优后,作业失败原因主要归结为Fetch Failure,如下图所示。由于大部分集群使用的是HDD,传统Shuffle的高随机读和高网络连接导致性能很差,低稳定性带来的Stage重算会进一步加剧性能回退。此外,小米一直在尝试利用存算分离架构的计算弹性降低成本,但Shuffle对本地盘的依赖造成了阻碍。

2 RSS在小米的落地

小米一直在关注Shuffle优化相关技术,21年1月份跟阿里云EMR团队就RSS项目建立了共创关系,3月份第一个生产集群上线,开始接入作业,6月份第一个HA集群上线,规模达100+节点,9月份第一个300+节点上线,集群默认开启RSS,后续规划会进一步扩展RSS的灰度规模。

在落地的过程,小米主导了磁盘容错的开发,大大提高了RSS的服务稳定性,技术细节如上文所述。此外,在前期RSS还未完全稳定阶段,小米在多个环节对RSS的作业进行了容错。在调度端,若开启RSS的Spark作业因Shuffle报错,则Yarn的下次重试会回退到ESS。在ShuffleWriter初始化阶段,小米主导了自适应Fallback机制,根据当前RSS集群的负载和作业的特征(如Reducer并发是否过大)自动选择RSS或ESS,从而提升稳定性。

3 效果

接入RSS后,Spark作业的稳定性、性能都取得了显著提升。之前因Fetch Failure失败的作业几乎不再失败,性能平均有20%的提升。下图展示了接入RSS前后作业稳定性的对比。

ESS:

RSS:

下图展示了接入RSS前后作业运行时间的对比。

ESS:

RSS:

在存算分离方面,小米海外某集群接入RSS后,成功上线了1600+ Core的弹性集群,且作业运行稳定。

在阿里云EMR团队及小米Spark团队的共同努力下,RSS带来的稳定性和性能提升得到了充分的验证。后续小米将会持续扩大RSS集群规模以及作业规模,并且在弹性资源伸缩场景下发挥更大的作用。

五 开源

重要的事说三遍:“阿里云RSS开源啦!” X 3

git地址: https://github.com/alibaba/RemoteShuffleService

开源代码包含核心功能及容错,满足生产要求。

计划中的重要Feature:

  1. AE
  2. Spark多版本支持
  3. Better 流控
  4. Better 监控
  5. Better HA
  6. 多引擎支持

欢迎各路开发者共建!

六 Reference

[1]Min Shen, Ye Zhou, Chandni Singh. Magnet: Push-based Shuffle Service for Large-scale Data Processing. VLDB 2020.
[2]Haoyu Zhang, Brian Cho, Ergin Seyfe, Avery Ching, Michael J. Freedman. Riffle: Optimized Shuffle Service for Large-Scale Data Analytics. EuroSys 2018.
[3]Sriram Rao, Raghu Ramakrishnan, Adam Silberstein. Sailfish: A Framework For Large Scale Data Processing. SoCC 2012.
[4]KFS. http://code.google.com/p/kosmosfs/
[5]Google Dataflow Shuffle. https://cloud.google.com/blog/products/data-analytics/how-distributed-shuffle-improves-scalability-and-performance-cloud-dataflow-pipelines
[6]Cosco: An Efficient Facebook-Scale Shuffle Service. https://databricks.com/session/cosco-an-efficient-facebook-scale-shuffle-service
[7]Flash for Apache Spark Shuffle with Cosco. https://databricks.com/session_na20/flash-for-apache-spark-shuffle-with-cosco
[8]Uber Zeus. https://databricks.com/session_na20/zeus-ubers-highly-scalable-and-distributed-shuffle-as-a-service
[9]Uber Zeus. https://github.com/uber/RemoteShuffleService
[10]Intel RPMP. https://databricks.com/session_na20/accelerating-apache-spark-shuffle-for-data-analytics-on-the-cloud-with-remote-persistent-memory-pools
[11]Tencent FireStorm. https://github.com/Tencent/Firestorm
[12]Aliyun RSS在趣头条的实践. https://developer.aliyun.com/article/779686
[13]Aliyun RSS架构. https://developer.aliyun.com/article/772329

作者 | 一锤、明济、紫槿

原文链接

本文为阿里云原创内容,未经允许不得转载。

阿里云 EMR Remote Shuffle Service 在小米的实践相关推荐

  1. 阿里云EMR Remote Shuffle Service在小米的实践

    简介:阿里云EMR自2020年推出Remote Shuffle Service(RSS)以来,帮助了诸多客户解决Spark作业的性能.稳定性问题,并使得存算分离架构得以实施,与此同时RSS也在跟合作方 ...

  2. 阿里云EMR 2.0:兼容开源,贡献开源,超越开源

    摘要:本文整理自阿里云资深技术专家吴威(无谓)在 阿里云EMR2.0线上发布会 的分享.本篇内容主要分为三个部分: 1.兼容开源阶段 2.贡献开源阶段 3.超越开源阶段 兼容开源阶段 开源这个词在最近 ...

  3. 降本增效利器!趣头条Spark Remote Shuffle Service最佳实践

    王振华,趣头条大数据总监,趣头条大数据负责人 曹佳清,趣头条大数据离线团队高级研发工程师,曾就职于饿了么大数据INF团队负责存储层和计算层组件研发,目前负责趣头条大数据计算层组件Spark的建设 范振 ...

  4. 【阿里云EMR实战篇】以EMR测试集群版本为例,详解 Flink SQL Client 集成 Hive 使用步骤

    简介: 以测试集群版本为例(EMR-4.4.1)-- Flink SQL Client 集成 Hive 使用文档 作者:林志成,阿里云EMR产品团队技术支持,拥有多年开源大数据经验 1.以测试集群版本 ...

  5. 阿里云EMR异步构建云HBase二级索引

    一.非HA EMR构建二级索引 云HBase借助Phoenix实现二级索引功能,对于Phoenix二级索引的详细介绍可参考https://yq.aliyun.com/articles/536850?s ...

  6. 阿里云 EMR Delta Lake 在流利说数据接入中的架构和实践

    简介: 为了消灭数据孤岛,企业往往会把各个组织的数据都接入到数据湖以提供统一的查询或分析.本文将介绍流利说当前数据接入的整个过程,期间遇到的挑战,以及delta在数据接入中产生的价值. 背景 流利说目 ...

  7. 阿里云EMR集群搭建及使用

    目录 1.简介 1.什么是EMR 2.组成 3.与自建hadoop集群对比 4.产品架构 2.使用 1.创建EMR集群 1.登录EMR on ECS控制台 2.软件设置 3.硬件设置 3.基础配置 2 ...

  8. 阿里云EMR 2.0:重新定义新一代开源大数据平台

    摘要:本文整理自阿里云高级产品专家何源(荆杭)在 阿里云EMR2.0线上发布会 的分享.本篇内容主要分为三个部分: 开源大数据的痛点及EMR产品历程 EMR2.0 新特征 总结 一.开源大数据的痛点及 ...

  9. 阿里云 EMR 基于 Apache DolphinScheduler 产品技术实践和社区贡献

    摘要:本文整理自阿里云 EMR 数据开发团队负责人孙一凡(Evans 忆梵),在 Spark&DS Meetup 的分享.本篇内容主要分为四个部分: 1.我们是谁 2. 为什么选择 Dolph ...

最新文章

  1. 空气输入法!浙大最新研究:空中动动手指,就能给智能手表输入文本
  2. linux7提示软件安装源位置不对,详解 RHEL7.1 yum源配置与软件安装
  3. 继Facebook开源PyTorch3D后,谷歌开源TensorFlow 3D场景理解库
  4. spring-MVC源码解读(一)
  5. 唐诗三百首加密软件如何使用_视频加密一机一码软件该如何选择?有哪些因素影响?...
  6. WPF学习笔记-第二周【基本笔刷】
  7. python3 绝对值_Python的绝对值最大的数字,python
  8. php连接mysql数据库输出_PHP连接并输出数据库数据实例代码
  9. ruby和python比较_ruby和python哪个强大
  10. [Swift]SwiftyJSON的使用:解析JSON
  11. ETL工具kettle 日志表配置及工作流程
  12. 人脸识别主要算法原理
  13. 过年发生的,WinM7推出,MeeGo诞生,iPhone香肠
  14. 在windbg调试的时候 针对ecx和eax说明
  15. 错误提示:“ReferenceError: require is not defined in ES module scope, you can use impor”解决方法
  16. iOS开发 区分Home键和锁屏键事件
  17. Tyvj 1047 乘积最大
  18. 金融经济学(王江)期末梳理 第十三章 资本资产定价模型(CAPM)
  19. 【无标题】小程序开发网站建设前端网页制作美工H5
  20. 离散数学第六章 格和布尔代数

热门文章

  1. python 32位和64位共存以实现打包32位exe和64位exe
  2. 面试编程题1:给定一个字符串,统计出每个字母出现次数
  3. 超宽带射频信号高速记录回放系统—全球顶级信号记录回放带宽6GB/S!
  4. 关于duilib的按钮响应函数/C360SafeFrameWnd
  5. opencv控制鼠标
  6. 蛇形矩阵 java_Java打印蛇形矩阵
  7. 程序员如何减掉小肚腩
  8. 局域网限速软件_百度网盘竟良心发现,下载不再限速?
  9. Golang解决TCP粘包拆包问题
  10. 解决:关闭移动数据,接收彩信,不能成功接收下载彩信