Thrill: 基于C++的高性能分布式批处理算法

  • 摘要
  • 1、介绍
    • 概述
    • 我们的贡献
    • A 相关工作
  • 2、Thrill设计
    • A. 分布式的不可变数组
    • B. 示例:WordCount
    • C. DIA操作总览
    • D. 为什么是数组?
    • E. 数据流图的实现
    • F. 数据、网络和I/O层
    • G. 归约、分组和排序的实现细节
      • 1)Reduce操作
      • 2)分组操作
      • 3)分布式排序
  • 3. 实验结果
    • A. 微观基准
    • B. 平台
    • C. 结果
  • 4、结论和未来工作

摘要

我们介绍了Thrill的设计和性能评估,它是一个通用大数据处理框架的原型,具有方便的数据流式编程接口。Thrill与Apache-Spark和Apache-Flink有些相似,但有两个主要区别。首先,Thrill是基于C++的,它具有直接的本机代码编译、更高速缓存友好的内存布局和显式内存管理,具有性能优势。特别是,Thrill使用模板元编程将随后的本地操作链编译成一个单独的二进制例程,而不需要中间缓冲,并且具有最少的间接操作。其次,Thrill使用数组而不是集合作为其主要数据结构,这样可以实现额外的操作,如排序、前缀和、窗口扫描或组合多个数组的相应字段(压缩)。我们使用HiBench套件中的五个内核将Thrill与Apache Spark和Apache Flink进行了比较。Thrill总是比其他框架更快,而且通常是其他框架的几倍。同时,源代码具有相似的简单性和抽象性。

1、介绍

在本文中,我们提出了一种新的开源C++框架,用于分布式批处理数据的处理。由于近年来单个处理器内核的时钟速度停滞不前,对并行和分布式算法的需求不容忽视。同时,我们也经历了数据量的爆炸式增长,使得可扩展的分布式数据分析成为越来越多应用领域的瓶颈。我们希望在弥合“大数据”处理两种传统方案之间的差距方面迈出一步。

一方面,在学术界和高性能计算(HPC)中,分布式算法通常是在C/C++中手工编写的,并使用MPI进行显式通信。以实现困难为代价实现了高效率。另一方面,软件行业的全球参与者创建了自己的生态系统来满足他们的数据分析需求。谷歌在2004年推广了MapReduce模型,并描述了它们的内部实现。Apache Hadoop和最近的Apache Spark和Apache Flink作为基于Scala/Java的开放源代码解决方案,已经引起了人们的注意。这些框架提供了一个简单的编程接口,并承诺自动工作并行化和调度、自动数据分发和自动容错。尽管大多数基准测试都强调了这些框架的可伸缩性,但底层效率却被证明是缺乏的,且CPU常常成为性能瓶颈。

Thrill弥合这一鸿沟的方法是使用一个可伸缩算法的原语库,如Map、ReduceByKey、Sort和Window,这些原语可以有效地结合起来,使用流水线数据流构建更大的复杂算法。Thrill是使用C++ 14编写的,具有最小的外部依赖性,并可在Linux、Mac OS和Windows上编译。通过使用C++,Thrill能够利用编译时优化、模板元编程和显式内存管理。Thrill能够有效地处理固定长度的项目,如单个字符或固定维度向量,而不受对象开销的影响,这是由于C++的零开销抽象。它将操作的数据类型视为不透明的,并利用模板编程用用户定义函数(UDF)实例化操作。例如,排序操作的比较函数被编译成实际的内部排序和合并算法(类似于std::sort)。同时,Thrill不试图优化操作的执行顺序,因为这需要熟知数据内容以及UDFs如何操作数据。

与大多数MPI程序类似,Thrill程序运行在批量同步管理器中。Thrill主要关注快速的内存计算,但在需要时透明地使用外部内存。Thrill使用的函数式编程风格实现了简单的并行化,这对于共享内存并行性也非常有效。因此,由于可伸缩原语的限制,Thrill程序运行在大范围的同构并行系统上。

利用C++以达到实现高性能分布式算法的目的。基于JVM的框架通常很慢,因为解释字节码的开销,即使即时(just-in-time,JIT)编译已经使这一领域趋于平坦,但由于对象间接寻址和垃圾收集,Java/Scala必须保持较低的缓存效率。在处理大量数据时,可伸缩的分布式应用程序的最终瓶颈是网络的带宽。通过使用更多优化的实现,更多的CPU时间留给压缩、重复数据消除和其他算法以减少通信。在较小的网络中,CPU常常是瓶颈,对于大多数应用程序来说,一个小的集群就足够了。

使用C++必须明确地进行内存管理。虽然这对于比垃圾收集内存更可预测和有更高的性能,但它使编程更加困难。然而,C++ 11在这方面的编程工作量已大大减轻。

虽然可伸缩算法可使用更多的硬件保证更高的性能,但从并行共享内存到分布式的性能影响是巨大的。原因是通信延迟和带宽问题。这种网络开销和大数据框架的额外管理开销,使得实现加速的硬件成本过高。由于无法避免网络开销,Thrill不能达到零开销。但是,通过重叠计算和通信,以及采用二进制优化的机器码,Thrill可实现较小的开销。

Thrill是BSD 2条款许可下的开源软件,可作为GitHub上的社区项目使用。它目前有超过5万2千行的C++代码,大约有十几个开发者做出了贡献。

概述

本节的其余部分将介绍相关工作,重点介绍Spark和Flink。第二节讨论了Thrill的设计,特别是它的API和基本原理。我们在第II-B节中给出了一个完整的WordCount示例,然后概述了当前的操作组合及其实现的详细信息。在第三节中,我们展示了基于PageRank和KMeans等五个微基准的Thrill、Spark和Flink的对比实验结果。第四节总结并对今后的工作提出展望。

我们的贡献

Thrill表明,随着C++ 11 lambda表达式的出现,可使用与Spark或FLink等当前流行的框架相似的抽象的灵活和方便的API,使用C++进行大数据处理变得非常有用。这不仅可利用C++的性能优势,而且允许我们通过复杂模板元编程将本地OPR序列的序列透明地编译成单个二进制代码。通过使用数组作为主要数据类型,我们可以实现额外的基本操作,这些操作在传统的基于多集的系统中实现起来更复杂、成本更高。我们的实验评估表明,即使是当前的原型实现,与Spark和Flink对比也有很大的性能优势。

A 相关工作

由于“大数据”主题的重要性和炒作性,近年来出现了大量的分布式数据处理框架。它们涵盖了这一主题的许多不同方面,如数据仓库和批处理、流聚合、交互式查询、专用图、和机器学习框架。

2004年,Google建立了一个易于使用的可伸缩数据分析接口MapReduce。他们的论文催生了一个完整的研究领域,即如何使用just-map来表达分布式算法,并尽可能使用少地轮数。很快,一个开源的MapReduce框架Apache Hadoop问世,这个框架是用Java为商业硬件集群编写的。从这些程序集合中最值得注意的是Hadoop分布式文件系统(HDFS),它是MapReduce容错数据管理的关键。随后,人们做了大量的学术工作来优化Hadoop的各个方面,比如调度和数据洗牌。

MapReduce和Hadoop由于其简单的编程接口而非常成功,同时也存在一些缺陷。例如,由于MapReduce轮次次数很多,迭代计算非常慢,每个轮次可能需要完整的数据交换和磁盘IO。最近的一些框架,如Apache Spark和Apache Flink,提供了一个更通用的接口来提高可用性和性能。

Apache Spark在一个称为弹性分布式数据集(RDD)的抽象上操作。这种抽象为用户提供了一个易于使用的接口,该接口由许多确定的粗粒度操作组成。每个操作都可以分类为转换或操作。转换是一种懒惰的操作,它在给定一个RDD(如map或join)的情况下定义新的RDD。操作将计算出的结果返回给用户程序,例如从外部存储器中计数、收集或读/写数据。当一个动作触发计算时,Spark检查先前调用的转换序列并识别所谓的执行阶段。Spark运行在一个主从模式的架构中。当驱动程序在主节点上运行时,实际的计算发生在具有基于块的工作分区和调度系统的工作节点上。Spark可以在主存中维护已经计算过的rdd,以便将来的操作可以重用,从而加快迭代计算的速度。

在更新的版本中,Spark又添加了两个api:DataFrames和dataset。这两种语言都为类似于SQL的高级声明性编程提供了域指定语言,这使得Spark能够优化查询执行计划。更进一步说,除了最初的Scala/Java程序之外,Spark还可以在线生成优化的查询字节码。优化后的字节码可以使用更有效的直接访问数据的方法,不再需要将数据存储为JVM对象,因此可以避免垃圾收集。DataFrame引擎构建在原始RDD处理接口之上。

Apache Flink源于一个平流层的研究项目,目前正在从一个学术项目向工业项目发展。虽然Flink与Spark有许多共同的想法,比如主工作模型、惰性操作和迭代计算,但它紧密地集成了并行数据库系统中已知的概念。Flink的核心接口是一种域指定的声明性语言。此外,弗林克的重点已转向流式处理,而不是批处理。

在Flink中,优化器接受一个用户程序并生成一个逻辑运算符图。然后,该框架执行基于规则和成本的优化,如操作的重新排序、本地操作的流水线、算法的选择以及不同数据交换模式的评估,以确定Flink认为最适合给定用户程序和集群配置的执行计划。Flink基于一个类似于并行数据库系统的流水线执行引擎,它被扩展为集成流操作和丰富的窗口语义。通过仅对更改的数据执行增量迭代,以及跨迭代将计算放在同一个工作进程上,可以加快迭代计算的速度。受Chandy-Lamport快照启发,容错通过连续地获取分布式数据流和操作状态的快照来实现。Flink还拥有一个独立于JVM垃圾收集器的内存管理系统,以获得更高的性能和更好的内存峰值控制。

Spark和Flink的一些接口有显著不同。Flink的优化器需要对数据对象的组件以及UDF如何对它们进行操作进行内省。这需要对UDF进行许多Scala/Java注释,并且需要间接访问组件的值。与Spark的RDD接口(用户可以使用宿主语言控制流)不同,Flink提供了自定义的迭代操作。因此,在这方面,Flink程序更类似于声明性SQL状态,而不是命令式语言。较新的DataFrame和Dataset接口向Spark引入了类似的概念,并使用自定义代码生成引擎进一步扩展了它们。Spark的核心是一个内存中的批处理引擎,它将流作业作为一系列小批处理执行。相比之下,Flink基于数据库系统中使用的流水线执行引擎,它允许Flink以流水线方式处理流操作,并且比微批处理模型中的延迟更低。此外,Flink支持外部内存算法,而Spark主要是一个内存溢出到外部存储的系统。

总体而言,JVM目前是开源大数据框架的主要平台。从程序员生产率的角度来看,这是可以理解的,但当考虑到C++是性能关键系统的主要语言时,令人惊讶的是,大数据处理本质上是性能关键的。因此,Spark和Flink努力克服JVM的性能损失,例如使用显式的不安全内存操作和生成优化的字节码,以避免对象开销和垃圾收集。我们提出了一个基于C++的框架Thrill,降低系统开销。

2、Thrill设计

Thrill是用C++编写的,并编译成二进制程序。此二进制代码的执行模型类似于MPI程序:一个相同的程序在h个计算机上共同执行。Thrill目前希望所有的机器都有几乎相同的硬件,它在机器之间平衡工作和数据。二进制程序在所有机器上同时启动,并通过网络协议连接到其他机器。Thrill目前支持TCP套接字和MPI作为网络后端。启动过程取决于指定的后端和集群环境。

每台机器称为主机,主机上的每个工作线程称为工作线程。目前,Thrill要求所有主机拥有相同数量的核心c,因此总共有p=h·c个工作线程。此外,每个主机有一个用于网络/数据处理的线程和一个用于异步磁盘I/O的线程。每个主机都有到其他h-1个主机的可靠网络连接,主机和工作线程被枚举为0到h-1和0到p-1。Thrill没有指定的主节点或驱动节点,因为所有通信和计算都是集体完成的。

Thrill目前不提供容错功能。虽然我们的数据流 API允许使用异步检查点平滑地集成容错,但必须更改h个机器的执行模型。

A. 分布式的不可变数组


Thrill高级数据流API的核心概念是分布式不可变数组(DIA)。DIA是以某种方式分布在集群上的元素数组。不允许直接访问数组。相反,程序员可以将所谓的DIA操作作为一个整体应用到数组中。这些操作是一组可伸缩的原语,如表1所示,它们可以组成复杂的分布式算法。DIA操作可以通过读取文件来创建DIAs,通过应用用户函数来转换现有DIAs,或者集体计算标量值,用于确定进一步的程序控制流。在一个Thrill程序中,这些操作用于在C++中懒惰地构建DIA数据流图(见图1)。只有在遇到action操作时才会执行数据流图。DIA项的实际存储方式以及在分布式系统上以何种方式执行操作对用户保持透明。


在当前的Thrill原型中,数组通常按顺序均匀地分布在p个工作节点之间。DIAS可以包含任何C++数据类型,只要可以序列化。Thrill包含所有原始类型和大多数STL类型的内置序列化方法;只有自定义的非平凡类需要额外的方法。表I中的每个DIA操作都是作为C++模板类实现的,可以用适当的UDFs实例化。

B. 示例:WordCount


我们现在给出了流行的基准程序WordCount的Thrill源代码,以演示在Thrill中编程是多么容易。程序计算文本中每个单词出现的次数。在Thrill中,WordCount由五个DIA操作组成。

ReadLines(第4行)和WriteLines(第22行)用于从文件系统读取文本和将结果写入文件系统。目前,Thrill使用标准的POSIX 接口来读写磁盘,它需要一个分布式并行文件系统(如NFS、Lustre或Ceph)来为所有计算主机提供一个公共视图。ReadLines需要一个thrill::Context对象,它只对源DIA操作和一组文件是必需的。ReadLines的结果是一个DIA<std::string>,它将文件的每一行作为一个项。文件集是按字典顺序排列的,行集在工作进程中被平均划分。

但是,此DIA没有分配给变量名。相反,我们立即追加一个FlatMap操作(第5行),该操作将每个文本行拆分为单词,并发出一个std::pair<std::string,size_t>(化名为Pair)包含每个单词(word,1)。在本例中,我们使用自定义拆分函数和std::string_view来引用文本行中的字符,并将它们复制到word字符串中。FlatMap lambda函数(第7行)的emit auto参数使Thrill能够通过以下ReduceByKey操作来传送FlatMap。第II-E节讨论了流水线的细节。FlatMap的结果是一个DIA<Pair>,它被分配给变量词对。注意,关键字auto使C++自动推断出词对的适当类型。

然后使用ReduceByKey操作逐字减少(word,1)对。这个DIA操作必须用一个键抽取器(从第14行的对中取出单词)和一个归约函数(将两个具有相同键的对相加,第17行)参数化。目前Thrill使用哈希表实现ReduceByKey,如II-G节中所述,C++在ReduceByKey实例化期间将推断出大多数类型,输入和输出都是隐式的;只有FlatMap才需要指定发出的类型。

ReduceByKey的输出也是一个DIA。我们需要使用映射将这些对转换为可打印字符串(第19-21行),然后使用WriteLines操作将其写入磁盘。同样,映射的返回类型(std::string)是自动推断的,因此映射操作的结果是DIA<std::string>。

请注意,图2中的代码没有明显的并行和分布式的描述。它是延迟构建的数据流图中DIA操作的实现,执行实际的分布式执行。该代码指示C++编译器使用所提供的UDF对这些模板类进行实例化和优化。在运行时,当在DIA数据流图中遇到操作时,这些模板类的对象将按程序创建和计算。

C. DIA操作总览

表1概述了目前由Thrill支持的DIA操作。DIA的不变性支持函数式数据流编程。由于DIA操作可以依赖于其他DIA作为输入,因此它们形成一个有向无环图(DAG),称为DIA数据流图。我们将DIA操作表示为图中的顶点,而有向边表示依赖关系。直观地说,可以将有向边描绘为从一个操作流向下一个操作时的DIA值。

我们将所有的DIA操作分为四类。源操作没有传入的边,并从外部源(如文件、数据库查询)生成DIA,或者仅通过生成整数0…n-1。具有一个或多个传入边并返回DIA的操作进一步分类为局部操作(LOps)和分布式操作(DOps)。LOps的例子是Map或Filter,它将一个函数独立地应用于DIA的每个项。LOP可以在本地和并行执行,工作节点之间不需要任何通信。另一方面,诸如ReduceByKey或Sort之类的DOps可能需要通信和磁盘IO。

第四类是action,它不返回DIA,因此没有传出边。DIA数据流图是惰性构建的,即DIA操作在创建时不会立即执行。Action触发对图的求值并将值返回给用户程序。例如,将DIA写入磁盘或计算所有值之和都是action。通过检查action的结果,用户程序可以确定未来的程序流,例如迭代循环直到满足条件。因此,控制流决策是在C++中以命令循环或递归(主机语言控制流)共同执行的。

初始DIAs可以通过Thrill的源操作生成。Generate通过使用generator函数将每个索引[0,size)映射到一个项来创建DIA。ReadLines和ReadBinary从文件系统读取数据,并用这些数据创建一个DIA。

Thrill的FlatMap LOp对应于MapReduce中的map步骤。在C++中,输入DIAs的每个项被函数F映射到零、一个或多个输出项,这是通过调用每个项的emit函数来完成的,如WordCount示例所示。FlatMap的特殊情况是Map将每个项映射到恰好一个输出;Filter选择输入DIA的子集;BernoulliSample以恒定的概率p独立地对每个项进行采样;LOp集将两个或多个DIA融合为一个DIA,而不考虑项的顺序。相反,DOp Concat通过通信保持输入DIAs的顺序并将它们连接起来。

缓存显式地存储DIA操作的结果,以供以后使用。另一方面,关闭功能管道内容请详见第II-G节。

Thrill的ReduceByKey和GroupByKey DOps实现MapReduce的reduce步骤。在这两种操作中,输入项都是按键进行分组。使用键提取函数k从项中提取键,然后使用哈希函数h映射到工作节点。在ReduceByKey中,关联归约函数r指定如何将两个项合并为一个项。在GroupByKey中,具有特定键的所有项都收集在一个工作节点上,并由组函数g处理。如果可能,ReduceByKey应该是首选项,因为它允许本地归约,从而降低通信量和运行时间。

ReduceByKey和GroupByKey都提供了ToIndex变量,其中输入DIA的每一项都由函数i映射到结果DIA中的索引。生成的DIA的大小必须以n表示。映射到同一索引的项可以使用关联归约函数r进行归约,也可以由组函数g进行处理。DIA中的空槽用中性项填充。Sort使用用户定义的比较函数c对DIA排序,Merge再次使用用户定义的比较函数c合并多个排序的DIA。PrefixSum使用关联函数s计算每个项的前缀和(部分和)。

Zip使用类似于函数式编程语言的Zip函数z将两个或多个DIAs索引组合起来。函数z应用于索引i的所有项,以在索引i处传递新项。Zip函数要求所有DIAs具有相等的长度,但是Thrill还提供了将DIAs剪切为最短或将其填充为最长的变体。ZipWithIndex用全局索引对每个DIA项进行zips。虽然ZipWithIndex可以使用Generate和Zip进行模拟,但是组合的变体可以减少通信量。

窗口保持DIA的顺序,并将所有k个连续项(滑动窗口)传递给函数w,该函数只返回一个项。在FlatWindow变量中,窗口函数f可以发出零个或多个项。Thrill还提供了专门的服务,可以提供k个连续项目的所有不相交窗口。

Sum是一个action,它对DIA中的所有项计算一个关联函数s,并在每个worker上返回结果。默认情况下,Sum使用+。Max和Min是Sum与其他运算符的特化。Size返回一个DIA中的项目数,AllGather在每个worker上以std::vector的形式返回整个DIA。WriteLines和WriteBinary向文件系统写入一个DIA。Execute可用于显式触发DIA操作的计算。

除了触发评估的动作外,Thrill还提供预期action,包括SumFuture、MinFuture、AllGatherFuture等,它们只在DIA数据流图中插入一个动作顶点,而不触发评估。使用ActionFuture,一个人只需一次数据往返就可以计算出多个结果(例如,最小和最大项目)。

表I中列出的当前可伸缩原语DIA操作集不是最终版本,将来可能会根据需要和谨慎添加更多分布式算法原语。在第II-G节中,我们将更详细地描述一些操作的实现。我们还展望了如何加速可伸缩原语的未来工作,这些原语可以作为当前实现的替代品。

D. 为什么是数组?

Thrill的DIA API显然类似于Spark和Flink的数据流语言,它们本身也类似于许多函数式编程语言。但是,我们明确定义了DIAs中要排序的项。在像ReduceByKey这样的操作之后,这个顺序可能是任意的,ReduceByKey将项散列到数组中的索引,但是它们确实有一个顺序。我们的许多操作,如前缀、排序、合并、压缩,特别是窗口,只对有序数据类型有意义。

有序的分布式数组为如何在算法中利用这个顺序提供了条件。本质上,这个顺序将局部性的概念重新引入到分布式数据流编程中。虽然不能直接访问DIA项,但是可以使用窗口函数与作为上下文的相邻项并行迭代它们。在Thrill程序中,一个常见的设计模式是使用Sort或ReduceToIndex排序,然后使用窗口处理它们。此外,如果一个窗口中的计算需要来自多个DIA的上下文,可以首先将它们压缩在一起。

我们期待着未来的工作利用这种有序概念。此外,将Thrill从一维数组扩展到更高维数组、稀疏矩阵或图上,不仅有用,而且在概念上也很有趣,因为这些数据类型具有更复杂的局部性概念。

E. 数据流图的实现

与我们在前面章节中为应用程序编程人员介绍的DIAs相反,分布式项数组通常并不显式存在。相反,DIA仅仅是两个具体DIA操作之间的概念数据流。这种数据流抽象允许我们应用称为流水线或链式的优化。链式通常描述为将一个或多个函数的逻辑组合成单个函数(称为管道)的过程。在Thrill中,我们将所有独立并行的本地操作(FlatMap、Map、Filter和BernoulliSample)和下一个分布式DIA操作的第一个本地计算步骤链接到一个优化的二进制代码块中。通过这种链式操作,我们既减少了它们之间数据流的开销,又减少了操作的总数,并且避免了存储中间显式数组的需要。此外,我们利用C++编译器将组合计算的局部计算与完全优化结合起来,从而减少间接操作的数量到最小,从而提高了缓存效率。本质上,我们使用链式操作将一个大容量同步并行的(BSP)超级操作的所有本地计算合并到一个程序集代码块中。

为了将DIA操作的实现集成到流水线框架中,我们将它们细分为三个部分:Link、Main和Push(参见图3中的示例)。链接部分通过执行一些最终的本地工作(如存储或传输)来处理传入项。此进程关闭管道并生成包含其逻辑的单个可执行代码块。主体部分包含实际的DIA操作逻辑,如排序、同步通信等。最后,推送部分通过发出项目进行进一步处理来表示新管道的开始。根据DIA操作的类型,每个部分可以是空的或执行一些琐碎的操作。

我们以PrefixSum为例来解释这些划分。在链部分,PrefixSum从前面的操作接收items流并按顺序存储它们。在存储它们时,每个工作节点对所有项都保留一个本地总和。在主体部分,工作节点对本地总和执行全局同步的排他前缀和,以计算其项的初始值。然后,在读取项并将其推入下一个操作时,将此本地初始值添加到项中。

链还影响DIA操作之间的数据依赖关系在Thrill的数据流图中的表示方式。由于将局部操作流水线到一个装配块中,所有LOp都与后续的DOp顶点融合。因此,DAG中只保留表示分布式操作的顶点。这个优化的数据流DAG对应于一组BSP supersteps及其数据依赖项,当遇到某个操作时将延迟执行。

执行由Thrill的StageBuilder完成,它在优化的DAG中执行宽度优先的反向搜索,以确定需要计算哪些DIA操作。然后按照拓扑顺序执行收集的顶点,以便在执行之前解析它们的数据依赖关系。通过保持每个顶点的状态,避免了不必要的重新计算,DIA操作通过引用计数自动处理。

我们通过大量使用C++模板编程实现链和执行模型。更准确地说,我们通过使用底层(lambda)函数的静态函子类型将它们链接在一起来组成一个管道。由于这些类型可以通过静态分析推导出来,链接可以在编译时进行,因此链接操作可以优化为汇编代码级别的单流水线函数。最后,所有简单的并行本地操作(如Map、FlatMap等)都会在运行时引入零开销,并与之后DIA操作的链接部分结合起来。

Thrill的链接机制的警告是前面的LoP和DOp(lambda)函数f1,f2等成为了DIA操作模板实例化类型DIA<T,f1,f2,…>的一部分,这通常不是问题,因为用C++ 11,我们可以鼓励使用AUTO关键字而不是使用具体的DIA<T>类型。然而,在迭代或递归算法中,必须更新DIA<T>变量。这些变量只是对实际DIA操作的引用,这些操作是不可变的,但是这些引用必须指向相同的底层DIA操作模板类型。我们通过添加一个名为Collapse的特殊操作来解决这个问题,该操作从DIA<T,f1,f2,…>中构造一个DIA<T>。该操作在数据流DAG中创建一个额外的顶点,该顶点关闭当前管道,存储当前管道,并创建一个新的(空)管道。当编译环境不支持Collapse时将发生编译错误。

在Thrill中,我们进一步采用了数据处理的流水线,在将数据推送到下一个操作时,启用了源DIA存储的consumption。DIA操作可以转换巨大的数据集,但一个简单的实现会从一个DIA读取所有item,将它们全部推送到管道中进行处理,然后释放数据存储。假设下一个操作也存储所有item,则这需要两倍的存储量。但是,在启用consume的情况下,前面DIA操作的存储在处理item时被释放,因此所有item的存储只需要一次,外加一个小的重叠缓冲区。

F. 数据、网络和I/O层

在Thrill的高级DIA API下有几个软件层,它们做实际的数据处理。DIA操作是C++模板类,它们被链接在一起,如II-E节中所描述的那样。这些操作使用数据、网络和IO层存储和传输item。

item必须序列化为字节序列,以便通过网络传输或存储在磁盘上。Thrill包含一个定制的C++序列化框架,其目的是提供高性能和零开销。这是可能的,因为不需要签名和版本控制。一般来说,固定长度的小item(如整数和固定大小的数值向量)以零开销存储。可变长度的item(如字符串和可变长度向量)以其长度作为前缀。复合对象作为其组件的序列存储。

DIA操作处理需要传输或存储,然后读取的item流。这样的item流直接序列化到块的内存缓冲区中,默认情况下该块的大小为2 MB。块中的item存储时不带分隔符或其他每item开销。这是可能的,因为Thrill的序列化方法正确地将光标移到下一item。因此,目前每个块只需要四个整数作为开销,而每个item只需要零个整数。这种高效的块存储格式对于处理小item(如纯整数或字符)非常重要,但是Thrill也可以处理跨越多个块的大blob。

块的序列称为文件,即使它通常存储在主存储器中。DIA操作使用模板BlockReader和BlockWriter类在文件中顺序读/写item。

要将item传输给worker,DIA操作有两种选择。一种是一组与MPI类似的高效同步集合通信原语,如AllReduce、Broadcast和PrefixSum。它们使用相同的序列化框架,主要用于阻止小item的通信,例如,整数AllReduce通常用于计算DIA中的item总数。

第二种是用于异步传输大量item的流。流允许所有工作进程之间的all - all的大容量通信。Thrill包含两个子类型的流,它们在从其他worker接收item的顺序上存在差异。CatStreams严格按照worker 权重排序发送item,而MixStreams则按照从网络接收块的任意顺序发送item。除了使用BlockReader和BlockWriter类在块中传输item外,流还可以将文件的整个范围分散到其他工作进程,而无需在网络层中添加块数据的深副本。通过流分散到同一主机上的工作线程的块中的item通过引用计数“传输”,而不是深度复制。与同一主机上的worker的所有通信都是通过同一进程空间内的共享内存完成的。

Thrill中的所有块都由块池管理。当块不再在任何文件中或被网络系统使用时,将对其进行引用计数并自动删除。块池还跟踪块中使用的内存总量。一旦超过用户定义的限制,块池就会异步地将最近使用最少的块交换到本地磁盘。为了区分哪些块可能被逐出,哪些被数据系统使用,必须固定块才能访问它们的数据。可以异步请求管脚以启用从外部内存的预取。然而,定位块的所有复杂性都隐藏在BlockReader/Writer中,以使DIA操作的实现变得容易。

Thrill将可用的系统内存分为三部分(默认相等):BlockPool内存、DIA操作内存和std::string等用户对象的空闲浮动堆内存。通过重载malloc(),所有内存都会被跟踪,因此用户应用程序不需要特殊的分配器。DIA操作的内部数据结构的内存限制在计算执行期间协商和定义。StageBuilder确定哪些DIA操作在同一阶段,并在它们之间公平地划分分配的内存。对于外部内存支持,操作必须遵守这些内部内存限制,例如,正确调整哈希表和排序缓冲区的大小。

G. 归约、分组和排序的实现细节

除了流水线DIA操作之外,在操作本身中仔细实现核心算法对性能也很重要。大多数操作目前都是直接实现的,未来的工作可能会集中在特定DIA操作的更复杂版本上。由于使用了通用的DIA操作接口,这些未来的实现可以很容易地插入到现有的应用程序中。

1)Reduce操作

由于item可以通过联合或交换的归约操作r:A×A->A立即归约,ReduceByKey和ReduceToIndex使用多级散列表实现。

Thrill区分两个归约阶段:传输前的前阶段和从worker接收item的后阶段。推入Reduce DOp的item首先由键提取程序k:A->K或索引函数i:A->[0,n)(见表一)处理。键空间K或索引空间[0,n)被等分到worker[0;p)上。在前阶段,将每个worker散列并将item插入到p个单独的散列表中的一个,每个散列表的目的地是一个worker。如果哈希表超过其填充因子,则传输其内容。如果找到两个item匹配到同一个键中,则使用r在本地组合它们。

在后期阶段从其他工作进程接收的item将插入到第二级哈希表中。同样,使用r可以立即减少item。为了实现真正的海量数据处理,Thrill可能会在后期将item迁移到外部内存中。第二级哈希表再次被划分为k个单独的表。如果k个表中的任何一个超过了它的填充因子,它的内容就会溢出到一个文件中。当post阶段接收到所有item时,通过选择新的哈希函数并重用哈希表,递归地减少溢出的文件。

前期和后期阶段使用自定义的线性探测哈希表,内置冲突减少功能。一个大内存段用于p个单独的哈希表。最初,每个分区只填充一小部分区域,用于节省分配时间。当哈希表被刷新或溢出时,其分配的大小将加倍,直到达到StageBuilder指定的内存限制。

2)分组操作

GroupByKey和GroupToIndex基于排序和多路合并。推送到Group-DOp中的item首先由键提取程序k:A->K或索引函数i:A->[0,n)处理,输出空间K或[0,n)均匀地分布到p个worker上。在确定目标worker之后,item将立即通过流传输到相应的工作进程。每个worker将所有接收到的item存储在内存里的向量中。一旦向量已满或堆内存耗尽,向量将按键排序,并序列化为一个可交换到外部内存的文件。一旦接收到所有item,就可以使用有效的多路合并来合并已排序的运行。已排序item的流按照键分成子序列,传递到组函数g:iterable(A)->B,作为一个多路合并迭代器。

3)分布式排序

Sort操作按照比较函数定义的全局顺序重新排列所有DIA item。在每个工作进程的链接步骤中,所有本地传入的item都会写入一个文件。同时,使用储层取样抽取随机样本,并在所有item可见后发送给worker 0。在主要部分,Thrill使用超标量样本排序在worker之间重新分配item:worker 0接收所有item样本,在本地对它们排序,选择p-1等距拆分器,并将拆分器广播回所有worker。它们使用p个bucket构建一个平衡的二叉树,以确定log p次比较中确定每个item的目标worker。由于超级标量样本排序要求存储桶数为2的幂,因此树中根据需要填充了哨兵。然后从文件中读取item,使用拆分器树进行分类,并通过流传输到相应的worker中。当worker在接收item时达到其内存限制,这些item将被排序并写入文件。如果创建了多个已排序的文件,则在推送期间合并这些文件。

如果简单地实现排序,则具有许多重复项的数据集可能会导致负载平衡问题。为了减少倾斜,Thrill使用item的全局数组位置来断开关系并确定其接收者。当一个item等于一个拆分器时,只有当且仅当它的全局数组位置低于相应的工作者分位数时,它才会被发送到较低级别的工作者。

3. 实验结果

我们在Amazon Web Services(AWS)EC2云上使用了五个合成的微基准测试应用程序,比较了Apache Spark 2.0.0、Apache Flink 1.0.3和Thrill。我们的基准和输入集基于HiBench,我们用Flink和Thrill的实现对其进行了扩展。

我们选择了五个微观基准:WordCount、PageRank、TeraSort、KMeans和Sleep。为了关注框架本身的性能,我们尝试使用每个框架同样很好地实现基准,并确保使用相同的基本算法。Spark和Flink可以用Java或Scala编程,只要有可能,我们将两者的实现都包括在内。Spark和FLink基准代码来源于不同地方,所有的Thrill实现是由我们编写的,并包含在Thrill C++源代码作为例子。当我们试图尽可能地配置Spark和Flink时,这些框架提供的配置选项的复杂性和数量使得我们可能遗漏了一些调整参数。在很大程度上,我们对HiBench保留了参数。实验是在输入的弱伸缩性下进行的,这意味着输入大小随着主机h的数量而增加,其中每个AWS主机有32个内核。

A. 微观基准

WordCount的Java和Scala的实现在中都可以从Spark和Flink附带的示例中获得。WordCount基准处理由C++ 版本的Hadoop的Ramdom Writer生成H·32 GIB大小的的文本。在这个随机文本中只有1000个不同的单词,我们不认为这是一个很好的reduce基准,因为只需要很少的数据进行通信,但是这个输入似乎是一个公认的标准。

对于PageRank,我们只使用了执行10次naive算法迭代的实现,其中包括当前权重与所有传出边的连接,以及新权重归约。我们从Spark的示例中获取实现,并将其修改为使用整数而不是字符串作为页键。我们采用了Flink的例子来计算没有标准化的PageRank并执行固定数量的迭代。Thrill使用ReduceToIndex和Zip模拟连接操作,并将页面id作为DIA的索引。实验的输入图包含h·4M个顶点,平均每个顶点39.5条边,总大小约为h·2.7GB,由HiBench中的PagerankData生成器生成。

TeraSort需要对100字节的记录进行排序,我们在每个框架中都使用了标准的排序方法。HiBench为Spark提供了一个Java实现,我们为Flink使用了一个非正式的Scala实现。利用Hadoop的teragen产生h·16GB的数据作为输入。

对于KMeans,我们使用了Spark和Flink示例中的实现。Spark调用它的机器学习包,而Flink的例子是一个完整的算法。我们用随机的初始质心确定了这两个算法都执行10次迭代,并在Thrill中实现了该算法。我们将维度数固定为3,因为Flink的实现需要固定的维度数,并且集群数为10。根据HiBench的设置,Apache Mahout的GenKMeansDataset用于生成h·16M个采样点,二进制Mahout格式转换为文本,以便使用Flink和Thrill进行读取(大小约为h·8.8GB)。

睡眠基准用来测量框架启动的时间开销。它在每个核心中启动一个任务,睡眠60秒。

B. 平台

我们在AWS上使用h r3.8xlege EC2实例执行了微基准测试。每个实例包含一个32 vCPU 2.5GHz 的Intel Xeon E5-2670 v2处理器、244 GB 内存和两个本地320 GB固态硬盘。我们使用内存基准测试工具测试性能为单核/L1缓存速度为86 GB/s、单核/RAM速度为11.6 GB/s和32核/RAM内存带宽为74 GB/s 。当读取8MB大小的块时,ssd达到460 MB/s,写入时达到397 MB/s。

h个实例被分配到一个AWS可用区域,并通过万兆网络相连接。我们的网络性能测量显示ping延迟约为100微秒,以及高达1 GB/s的持续点对点带宽。所有框架都使用TCP套接字来传输数据。我们尝试使用AWS S3、EBS和EFS作为基准输入的数据存储,但最终选择在EC2实例上运行单独的CephFS集群。在我们的实验中,Ceph提供了可靠的、可重复的性能和最小化的外部因素。每个EC2实例在本地SSD上携带一个Ceph ODS,我们将Ceph集群配置为只保留一个备份块,以最小化数据传输带来的带宽。我们没有使用HDFS,因为Thrill不支持它,而且基于posix的分布式文件系统(DFS)为所有框架提供了一个标准视图。另一个SSD用于框架创建的临时文件。

所有Spark实现都使用RDD接口。在Spark和Flink中支持容错不会产生额外的开销,因为没有编写检查点。默认情况下,检查点处于停用状态,必须显式配置。所有压缩都被停用,Spark被配置为使用Kyro序列化。

我们使用Ubuntu16.04 LTS(Xenial Xeros)系统,Linux内核为4.4.0-31、Ceph版本为 10.2.2(jewel)、Oracle Java版本为 1.8.0_101、Apache Spark版本 2.0.0、Apache Flink版本 1.0.3,并使用带有cmake的GCC5.4.0编译Thrill。

C. 结果




图4显示了当h=1、2、4、8、16个主机的基准测试结果。我们将时间除以一台主机上的输入字节数,这与每台主机上的item数成正比。图5显示的结果与图4相同,只是绘制为每个框架的运行时间比最快的运行时间慢。此外,我们使用来自Linux内核的信息测量了基准测试期间CPU、网络和磁盘I/O利用率的性能配置文件,并在表II和图6-7中显示了h=16的结果。

在所有的基准测试中,Thrill始终优于Spark和Flink,并且通常比其他框架快几倍。Thrill在于Spark和Flink的比较中,在单个主机上的速度增长最多,而随着网络和磁盘I/O成为瓶颈,增速开始变小。在WordCount中,文本从DFS读取,拆分成单词,单词对在本地减少。由于只出现了1000个独特的单词,因此总体结果很小,并且其通信可以忽略不计。Thrill通过DFS以1127MB/s的速度最大化网络利用率,并且只使用27%的可用CPU时间进行拆分和归约。Spark以939MB/s的速度最大化网络利用率,但占用了64%的CPU运行时间。Flink在16台主机中比Thrill慢了5.7倍,占用78%的CPU资源,且未达到最大网络利用率。通过散列表Thrill的归约速度非常快,其他框架对于相同的任务需要相当多的CPU时间。对于16台主机,由于网络文件系统的原因,Thrill受网络带宽限制,只比Spark(Scala)快1.28倍。

在PageRank中,当前的秩值与图的邻接表相连接,并通过网络传输,以对下一次迭代中的所有秩贡献进行归约。因此,PageRank基准测试在开始时的高CPU负载和归约时的高网络负载之间来回切换10次(见图6)。在16台主机上Spark(Java)比Thrill慢4.0倍,而Flink(Java)比Thrill慢1.6倍。Flink的流水线执行引擎在这个基准测试中运行良好,达到61%的CPU利用率和15%的网络利用率。从Spark的执行配置文件可以看出,由于散乱的程序,它在主机之间的工作不平衡。因此,每次迭代的时间都比需要的时间长。我们相信,通过实现连接算法,Thrill的性能可以进一步提高。

在TeraSort中,在16个主机中Thrill只比Spark快1.7倍,比Flink快1.18倍。Spark平均只有20%的CPU和42%的网络利用率,Flink为26%和45%,Thrill为25%和39%。在TeraSort中,Flink的流水线执行优于Spark,正如之前另一位作者所展示的那样。然而,由于CPU和网络利用率的原因,我们相信所有的实现都可以得到改进。

在KMeans算法中,质心集是广播的。然后将所有点按照最近的质心重新分类,然后通过归约从所有点确定新的质心。与PageRank一样,KMeans算法将高本地工作和高网络负载(减少和广播)交织在一起。Spark(Scala)比Thrill慢4.1倍,Spark(Java)比Thrill慢13倍,Flink比Thrill慢50倍。我们认为这是由于矢量的JVM对象开销,以及Spark和Flink广播质心的方式效率低下导致的。Flink的查询优化器似乎不能很好地用于其源包附带的KMeans示例。Thrill使用50%的CPU和25%的网络运行时间,而Spark则达到27%的CPU和7%的网络利用率。

Sleep基准突出显示了框架的启动时间。我们在图4中绘制了不包括睡眠时间的运行时间。Spark需要非常接近5+h*0.4秒才能启动。显然,主机不是并行启动的。Flink的启动时间要短得多,而Thrill则不到一秒钟。

4、结论和未来工作

我们已经证明,C++库可以被用作分布式数据处理框架,达到与使用java和Scala实现的系统一样的高级抽象,同时获得相当大的性能优势。在未来,我们希望一方面使用Thrill来实现先进的和高级的可伸缩并行算法(例如,用于构建简洁的文本索引)。Thrill已经被用于超过五种后缀排序算法、logistic回归和图形生成器。另一方面,在较低的层次上,我们希望使用Thrill作为一个平台,为大数据工具开发算法原语,从而实现大规模可扩展的负载平衡、通信效率和容错。

虽然到目前为止,Thrill是一个原型和研究平台,但本文的研究结果足以令人鼓舞地看到,它有可能发展成为主流的大数据处理工具。当然,在这个方向上还有很多工作要做,比如为其他流行工具(如Hadoop和AWS堆栈)实现接口,用Python等脚本语言创建前端以加快算法原型。为了实现大型集群配置的实际可扩展性和健壮性,我们还需要在负载平衡、容错和对高性能网络(如InfiniBand或OmniPath)的本机支持等问题上进行重大改进。

此外,我们认为在Thrill中引入额外的操作和数据类型(如图和多维数组)非常有用(另见第II-D节)。但是,我们不确定像Flink中那样的自动查询计划优化是否应该是Thrill的一个关注点,因为这使得在对计算有足够控制的情况下实现复杂算法更加困难。相反,最好使用Thrill作为更高级别工具的中间语言,该工具不再是一个简单的库,而是一个带有查询优化器的真正编译器。

Thrill: 基于C++的高性能分布式批处理算法相关推荐

  1. 10没有基于策略的qos_分布式QoS算法解析

    QoS对于服务多租户多业务的整体系统来说,不管对网络还是存储,都格外重要,没有QoS,会造成不同租户及业务之间对资源的抢占,用户A用爽了,用户B却遭了殃,频频投诉,这是系统管理员最头疼的事情.我们今天 ...

  2. 基于zeromq的高性能分布式RPC框架Zerorpc 性能测试

    Zeromq 是基于zeromq.gevent和 msgpack开发的分布式RPC框架zerorpc-python.这个框架简单.易用. 1. 安装zeromq 1 2 3 4 5 6 yum -y ...

  3. 【大论文】可扩展机器学习的并行与分布式优化算法综述_亢良伊2017

    一.基础知识: 1.目标函数 机器学习要优化的目标函数一般表现为一下形式: 函数J(θ)为目标函数,f为表示真实值与拟合值之差的损失函数,r(θ)为正则项(防止过拟合问题,主要分为L1正则项.L2正则 ...

  4. voyage java_GitHub - yezilong9/voyage: 采用Java实现的基于netty轻量的高性能分布式RPC服务框架...

    Voyage Overview 采用Java实现的基于netty轻量的高性能分布式RPC服务框架.实现了RPC的基本功能,开发者也可以自定义扩展,简单,易用,高效. Features 服务端支持注解配 ...

  5. 美国南加州大学骆沁毅:构建高性能的异构分布式训练算法

    计算机体系结构领域国际顶级会议每次往往仅录用几十篇论文,录用率在20%左右,难度极大.国内学者在顶会上开始发表论文,是最近十几年的事情. ASPLOS与HPCA是计算机体系结构领域的旗舰会议.其中AS ...

  6. 分布式共识算法 (Consensus Algorithm)

    分布式共识算法 (Consensus Algorithm) 如何理解分布式共识? 多个参与者 针对 某一件事 达成完全 一致 :一件事,一个结论 已达成一致的结论,不可推翻 有哪些分布式共识算法? P ...

  7. 海量数据处理_国家重点研发计划“面向异构体系结构的高性能分布式数据处理技术与系统”简介...

    技术发展现状 近年来,数据规模快速增长,使得Hadoop.Spark等大数据批处理系统在现实中得到了广泛应用.同时,应用对数据处理时效性需求不断加强,促使诸如Flink的大数据流式处理系统应运而生.现 ...

  8. 美团(Leaf)分布式ID算法

    本文来说下美团(Leaf)分布式ID算法 文章目录 概述 Leaf特性 Leaf-segment数据库方案 双buffer优化 Leaf高可用容灾 Leaf-snowflake方案 弱依赖ZooKee ...

  9. 批处理等待上一条完成再执行下面的_分布式批处理实现方案

    本文转自: 任务和调度:理解批量处理的关键设计-云栖社区-阿里云​yq.aliyun.com 1.1.什么是批量处理 本文讲的是任务和调度:理解批量处理的关键设计,维基百科给批量处理的定义是指在没有人 ...

最新文章

  1. 套用match_再也不用伤脑筋了,分享九个可以直接套用的Excel常用公式,收藏备用...
  2. 注册和登陆与数据库的链接
  3. 使用远程工具连接提示**Host *** is not allowed to connect to this mysql server**拒绝连接错误
  4. 面试宝典系列-PHP变量在内存中的存储方式
  5. JAva入门 活着_java基础回顾
  6. 支持向量机实现鸢尾花数据集分类matlab
  7. 计算机网络按信息传输介质的性能来划分,大学计算机基础练习题网络技术.doc...
  8. 计算机导论123出栈顺序,优·计算机导论复习提纲.doc
  9. 期货大赛项目|八,ueditor的应用
  10. 【数字信号处理】基于matlab GUI数字调音台【含Matlab源码 881期】
  11. win10下Clion的安装与配置
  12. 13家电脑品牌来源大揭底
  13. Matlab RRT算法三维轨迹规划及贪心算法轨迹优化
  14. 【74系列芯片的Verilog重现(一)】------74HC00
  15. 深入浅出WPF笔记——属性
  16. QRCode.js 生成二维码(文字信息二维码)
  17. Django-rest-framework框架之drf内置认证,权限,频率类,异常处理与自动生成接口文档
  18. nodejs后端接入阿里云《天眼数聚》银行卡四要素验证
  19. VS 和 VS Code 更换字体
  20. 微软官网下载win10系统

热门文章

  1. Gridview的HyperLinkField操作
  2. 第六周 day6 python学习笔记
  3. 支付宝免签,个人收款方案
  4. Java最大值最小值问题(用户输入)
  5. 【转】旅游推荐系统的演进
  6. java套娃_Java也有俄罗斯套娃,内部类剖析
  7. rpm安装Mysql的rpm包,提示/bin/sh is needed by MySql.rpm 错误的问题解决
  8. 【opencv】18、视频操作
  9. (转)纵观国内外证券公司IT发展史
  10. SRAM/SDRAM/DDR/Cache