Abstract

流处理工作负载和现代共享集群环境表现出高度的可变性和不可预测性。 结合大量参数空间和各种用户SLO,这使得现代流处理系统非常难以静态配置和调整。 为了解决这些问题,在本文中,我们研究了一种新颖的控制平面设计Chi,它支持连续监测和反馈,并支持动态重新配置。 Chi利用在数据平面通道中嵌入控制平面消息,为流处理系统实现低延迟和灵活的控制平面。

Chi引入了新的反应式编程模型和设计机制,以异步执行控制策略,从而避免全局同步。 我们展示了如何使我们能够轻松实现针对生产中观察到的不同用例的各种控制策略。 使用来自云提供商的生产工作负载的大规模实验证明了我们方法的灵活性和效率。

1. Introduction

亚马逊,Facebook,谷歌和微软等大型互联网服务提供商每秒产生数千万个数据事件[5]。 为了处理如此高的吞吐量,他们传统上采用离线批处理系统[21,13,4]。 然而,最近,使用流媒体系统[3,10,27,31]以确保及时处理并避免离线批处理系统通常产生的延迟的趋势越来越明显。

然而,完全实现这些在线实时系统所承诺的好处尤其具有挑战性。首先,流处理工作负载表现出较高的时间和空间可变性,与平均负载相比可达到一个数量级[27,31]。其次,大型共享集群表现出高硬件异构性和不可预测的连接使用率。第三,现代流媒体系统暴露出大的参数空间,即使对于经验最丰富的工程师来说也很难调整。此外,不同的用户和作业具有不同的服务级别目标(SLO),导致不同的配置设置。解决这些问题需要将持续的监控和反馈以及动态重新配置引入流系统的各个方面,从查询规划和资源分配/调度到参数调整。我们将负责这种控制机制的系统层称为控制平面,以将其与数据平面(负责数据处理的层)区分开来。

通过我们与产品团队和云运营商的互动,我们确定了控制平面应满足的以下要求。 首先,应该可以定义新的自定义控制操作,以适应不同的场景[24]。 其次,这应该通过开发人员的最小努力以及通过简单直观的API来实现,以最小化错误的可能性,这在分布式环境中检测尤其具有挑战性。 最后,即使存在高事件吞吐量和大型计算图,控制开销也应保持最小。 在今天的情况下,这一点尤其重要,因为不同的云提供商难以为其客户提供最好的SLO。 理想情况下,控制平面应与数据平面SLO匹配(通常为秒或更小)。

不幸的是,据我们所知,现有的流处理系统都不能满足所有这些要求。 Heron [27]和Flink [10]拥有一个单片控制平面,仅支持一组有限的预定义控制策略(例如,动态缩放和背压),并且缺少干净的API,这使得用户很难定义自定义策略。 Spark Streaming [41]采用批量同步并行(BSP)模型[38],其中一组事件被缓冲并作为批处理。 虽然这允许系统修改批次之间的数据流,但是由于硬批量边界而导致灵活性有限,并且由于所需的同步和调度操作而导致高开销。

为了克服当今系统的缺点并满足上述要求,我们探索了一种用于流处理系统的新型控制平面设计。 受到用于数据Operator的想法[37]的启发,我们建议将控制平面消息嵌入到数据流中。 通过利用现有的数据管道,控制消息可以低延迟和可扩展的方式进行流式传输,而无需任何特殊的机制(第7.2节)。 然而,这种看似简单的方法需要底层流式基础架构的支持,以一致性要求执行分布式控制操作,同时最小化同步开销并启用可扩展控制编程模型。

在本文中,我们描述了Chi,一种基于这种在数据流中嵌入控制消息以执行低延迟控制操作的想法的控制平面。我们引入了一个用于处理控制消息的反应式编程模型,允许用户编码许多复杂的控制策略。这隐藏了底层系统的复杂分布式特性,并为开发人员提供了一个直观的模型来理解应用控制事件的数据事件的边界。然后,我们设计了一种机制,以便在每个操作员以异步方式执行这些控制策略,从而避免同步和复杂运行时开销。我们通过使用我们的编程模型和执行机制表明,我们可以有效地实施一系列控制策略,从检查点和重放等基本功能到具有强大全局一致性要求的高级功能,如连续监控,计划重新优化和参数调整(§5)。最后,我们还讨论了如何通过在每个操作中使用单独的控制回路来轻松并行化我们的控制平面基础架构,从而使控制平面可扩展。

为了验证我们的声明并评估新控制平面设计的运行时性能,我们在Flare之上实现了Chi,Flare是我们集群中使用的内部流处理系统之一。 Flare建立在Orleans [7]之上,这是一个高效的分布式actor框架,并使用Trill [16]作为底层流处理引擎。虽然我们选择在Flare之上展示我们的方法,以便在我们的内部集群上轻松实现和部署,但我们的设计并不依赖于特定平台,并且通过一些额外的工程工作,它可以应用于现有实时流处理系统,包括Heron和Flink。我们基于生产工作负载和行业标准基准测试的实验评估表明,Chi能够在32个服务器群集上在5.8秒内对大型有状态数据流进行重新配置。实际使用案例进一步显示了Chi在帮助开发人员自动调整关键系统参数和减少61%的延迟方面的有效性,从而减少了运行时的工作负载偏差。

总之,本文做出以下贡献:

•通过从200,000多台生产服务器收集的日志对当今的流处理工作负载进行广泛研究。

•可扩展且高效的控制平面,允许流媒体系统有效地适应工作负载或环境的变化。

•灵活的控制平面API,使开发人员能够轻松实现自定义控制操作。

•使用来自云服务提供商的生产工作负载以及大量控制操作(包括动态扩展,故障恢复,自动调整和数据倾斜管理)评估我们在32服务器Azure VM群集上的原型实施。

2. MOTIVATION

我们通过分析现在流行的云提供商的数据分析集群的200,000多台服务器生成的日志,进行了广泛的测量研究。 这些集群生成大量日志数据(10s PB /天),开发人员执行对此数据的查询以进行调试,监控等。鉴于数据大小,我们需要具有足够网络和计算资源的大型集群实时处理数据。 我们的设置与最近对Google生产日志的分析一致[19]。

我们首先总结分析的主要结果,并讨论出现的机会。

工作负载不可预测性:服务器提取日志事件的负载变化很大。 图1(a)显示了我们集群中随机流子集每分钟生成的标准化元组数的热图。 这显示了两个重要的现象。 首先,通过水平线上的不同颜色模式证明,每个流呈现出独特的工作负荷特征(空间变异性)。 其次,虽然一些流在大多数时间内是暗的(高容量),但是几个流表现出突发性(时间变化),如同色水平线的色点所示。

日志事件中存在这种可变性的主要原因有三个:

  1. 异构工作负载:生成这些日志的集群处理各种工作负载,范围从典型的大数据作业(例如,过滤,转换,连接)到代表数百个团队的迭代机器学习工作负载。
  2. 故障和调试:故障在大规模中很常见。 这些故障(例如,网络问题,电源问题等)会产生大量错误日志,从而导致流量突发。 此外,当摄取数据不足时,开发人员会暂时激活更详细的日志以执行深入调试,从而产生更高的流量。
  3. 不同的流语义:我们摄取的日志具有不同的语义(例如,信息,详细,调试,错误等),服务中的各种组件具有不同的特征。 最低级别的服务(例如,存储层)自然地产生最多的日志,因为大多数请求涉及商店I / O事务。

为了量化可变性程度,在图1(b)中,我们显示了一个盒子和胡须图,其中Y轴表示在输入数据流子集上每分钟事件计数的增量。 框的高度显示第25和第75百分位数之间的计数差异。 除了每个流的行为的显着差异之外,值得注意的是在相同流中观察到的高变化范围,由大量异常值(图中的蓝点)表示。 例如,对于某些事件流,事件计数可在一分钟内增加高达数千万,表明及时响应对于保持负载峰值至关重要。

数据多样性。确定最佳查询计划和资源分配的另一个关键因素是数据分布。 为了分析其动态,我们关注key selectivity,定义为落入特定存储桶的元组数。 为了分析这个参数的动态,在图1(c)中我们绘制了各种分组key随时间的选择性,而在1(d)中我们绘制了多个分组密钥随时间的选择性。 在两个图中观察到的选择性的宽偏差表明,一次性或基于每个key的一次性查询计划(例如,基于传统基数的优化器)可能随时间推移是次优的。

多租户控制策略:在我们的生产环境中,许多团队使用相同的流处理系统。 因此,来自不同团队的查询或甚至来自同一团队的不同查询的SLO存在大量差异,导致需要同时激活多个控制策略。 例如,我们有许多客户有兴趣消费详细,信息和错误日志。 虽然详细和错误日志用于调试,但信息日志通常用于计算重要指标(例如,计费指标)。 我们的开发人员提出的一个流行请求是为信息日志和较弱的传递语义(例如,尽力而为,至少一次)提供更强的传递语义(例如,恰好一次)用于详细/错误日志。 这突出了在每个流级别或每个租户级别上支持多个控制策略的重要性。

多租户还在系统管理和优化方面引入了新的机会。 在我们的生产跟踪中,我们观察到在数据源和运算符方面共享重要重叠的查询(例如,用户以相同的方式解析日志)。 因此,控制策略可以考虑跨用户共享计算或者对中间数据进行物化以供以后重用。 此外,通过对数据流和工作负载的洞察,可以使用控制策略来自动且透明地选择可以提高执行效率的新数据布局(例如,利用分区)和存储引擎(例如,存储器内,数据库)。 虽然所有这些优化都被孤立地理解[33,34,40,32],但以集成方式应用它们来优化流式工作负载需要控制平面的灵活性和可扩展性。

目标:我们的跟踪分析揭示了我们工作负载的几个有趣的特征 - 高容量(10 / PB /天),低延迟要求(SLO秒),广泛多样化(100s数据流)和动态(由于 生产这些日志的服务的性质)。 基于此,我们可以得出以下控制平面要求列表:

  1. 高效且可扩展的反馈回路控制:由于我们的工作负载的多样性,允许用户或应用程序对其数据处理逻辑做出灵活的后期绑定决策以优化性能非常重要。 从可扩展性的角度来看,应该可以与专用组件集成(例如,Dhalion [24]等策略控制器)。
  2. 易于控制的界面:由于我们希望应用程序开发人员使用控制平面,因此具有更简单的编程接口对于其采用至关重要。
  3. 对数据平面的最小影响:控制平面应该对数据平面的延迟和吞吐量有限或没有影响,它应该能够无缝地处理高控制频率和大数据流图。

3. BACKGROUND

Chi主要设计用于基于流数据流计算模型的流系统。 在本节中,我们为读者提供了有关流数据流计算模型的必要背景知识。许多现有的流处理系统,如Naiad [30],Stream-Scope [28]和Apache Flink [10]采用流数据流计算模型。 在此模型中,计算作业表示为有状态运算符的有向无环图(DAG),其中每个运算符沿着指定的边发送和接收逻辑上带时间戳的事件。 每个Operator都维持可变的本地状态。 接收到事件后,Operator更新其本地状态,生成新事件,并将其发送给下游运营商。 没有Input通道的Operator是Source; 那些没有Output通道的Operator是Sink。

例如,考虑一个例子,其中用户有兴趣将句子标记为单词,并以数据并行方式计算所有句子中每个单词的加窗口累加计数(例如,每小时)。 此查询可以用LINQ样式语言表示,如下所示:

示例1的数据流图的实例在图2的阶段I中示出。注意,运算符{R1,R2}将字的累积计数保持为其可变的本地状态。 这些状态覆盖了一组不相交的密钥,并共同代表整个密钥子空间,例如,R1保持所有单词的累计计数,起始字母在['a' - 'l']范围内,而R2保留了 范围['m' - 'z']。

数据流计算模型:形式上,数据流计算表示为DAG G(V,E),其中V表示操作符集,E表示连接运算符的边集,u→v表示有向边运算符u到v。我们使用{·→v}来表示v的输入边,而{v→·}来自v的输出边。一个Operator v∈V由三元组(sv,fv,pv)描述,其中sv是v的状态; fv定义了一个捕获在v上运行的计算的函数,即f:sv,mei∈{·→v}  - →sV,{M eo∈{v→·}},意思是函数从输入边ei获取单个输入消息m,并且基于当前状态sv,它将v的状态修改为新的状态sv,并在一组out-上生成一条或多条出边{m eo∈{V→·}}。没有输入边的Operator称为Source,没有输出边的Operator称为Sink。一般来说,我们将与v不相关的属性表示为pv的状态,例如,pv可以定义v使用的最大内存。边e不具有任何状态但可以保持属性pe,例如,窗口的token大小用于触发背压条件。

4. DESGIN

将控制平面嵌入数据平面背后的直觉是,这使得能够重新使用现有的,高效的数据平面基础设施,并为开发人员提供熟悉的API来编写控制操作,即用于处理数据事件的操作。 此外,让控制消息直接跟踪数据事件提供了一种在事件序列之间创建自定义边界的自然方式。 这使得实现异步控制操作变得容易,因为控制消息可用于捕获因果依赖性,而无需昂贵的全局同步操作。

在本节中,我们将展示如何将这些原理融入我们的控制平面设计中,并提供几个示例来描述我们如何轻松地在顶部构建不同的控制操作。 我们通过描述我们设计的一些更高级的功能并讨论如何使Chi适应支持BSP风格的计算来结束本节。

4.1 Overview

Chi依赖于以下三个功能。 首先,Operator之间的Channel通道支持exactly-once和FIFO传送消息。 当下游Operator的缓冲区填满时,背压用于停止消息的发送。 其次,Operator一次一个地处理消息,并按照接收消息的顺序处理消息。 最后,底层引擎提供基本的Operator生命周期管理功能。 具体来说,它允许我们启动,停止和杀死一个Operator。 我们的内部流媒体系统Flare已经支持这些功能,但它们也可以在其他现有系统中找到[10,27,39,41]。

我们的系统设计使用数据流控制器,负责监控数据流行为和外部环境变化,并在需要时触发控制操作。 用户可以定义控制操作,并将其提交给控制器。

通过控制回路执行控制操作,该控制回路包括数据流控制器和数据流拓扑本身。通常,控制回路由三个阶段组成:(阶段I)控制器做出控制决策并使用唯一标识符实例化控制操作。控制操作通过实现反应式API(第4.2.2节)来定义,并且具有每个Operator的控制配置(例如,用于扩展数据流的新拓扑)。控制操作被序列化为控制消息(第§6节)。 (阶段II)控制消息由控制器扩展到数据流的所有Source操作符。然后,控制消息通过数据流传播,与正常数据消息交织。在传播期间,在接收到控制消息时,每个Operator触发相应的控制动作 - 其可以可选地将附加数据(例如,重新分区状态)附加到控制消息 - 并将控制消息广播到所有下游Operator。有关更多实现细节,请参见第4.2.2节和第6节。 (阶段III)最后,the sink Operator将控制消息传播回控制器,并且控制器执行后续处理。

我们使用图2来说明这个过程。 我们考虑控制器希望通过修改拓扑并添加新的reducer来增加吞吐量的情况。

  1. 在刚刚开始时,有两个Map Operator {M1,M2}和两个Reducer Operator {R1,R2},它们为摄取的句子计算单词计数。 这些Operator是有状态的。 例如,{R1,R2}保持所有单词的累计计数,其中R1保持以['a' - 'l']开头的所有单词的计数,R2保持['m' - 'z的计数“]。 控制器C负责监视所有Operator的内存使用情况,并在需要时重新配置并行性。 为了简化,我们省略了收集内存使用情况的控制消息,并在以下讨论中关注并行性重新配置过程。
  2. 一旦控制器C检测到Reducer的聚合内存使用率使用超过阈值它就做出重新配置决定以启动新的Reducer R3以增加内存的供应。 在新拓扑中,{R1,R2,R3}的状态应该重新分区,以便R1保持范围['a' - 'h']的字数,R2代表['i' - 'p'], 和['q' - 'z']的R3。 这种重新配置需要保持状态的一致性。 首先将带有新拓扑配置的控制消息广播到所有源节点(→{M1,M2})
  3. 当Source(Mapper M1或M2)接收到该控制消息时,映射器在处理消息时立即阻塞输入信道,用新拓扑更新其路由表并向下游广播消息(→{R1,R2,R3})。
  4. 当reducer R1(或R2)收到控制消息时,它会阻止接收消息的输入通道。 当收到来自所有输入通道的控制消息时,它会更新其路由表并检查其状态。 接下来,它将状态分为两部分:范围['a' - 'h']的累积字数和范围['i' - 'l'](或范围['m' -  'p']和['q' - 'z'])并且处理需要由R3处理的状态,即['i' - 'l']的字数(或对于['m '-'p'])到控制消息并沿所有输出通道广播(→{R3,C})。
  5. 当R3收到控制消息时,它会阻止该输入通道。 如果控制消息源自R1(或R2),它将从控制消息中记录状态。 当它接收来自所有输入通道的控制消息时,它继续合并所有接收的状态,生成范围['i' - 'p']的累计字数的新状态,并安装一个新函数(来自 控制消息)使用新状态。 最后,它在输出通道上广播(→C)。
  6. 当C从所有预期汇聚节点{R1,R2,R3}接收控制消息时,完成横向扩展操作。 然后,控制器在新拓扑中持续监视这些运营商的内存使用情况,并且可以根据需要决定Scale In/Scale Up。 这形成了反馈回路控制。

4.2 Control Mechanism

接下来,我们将描述支持Chi的核心机制。 我们首先正式定义数据流计算模型,并解释图形转换是如何发生的。 然后,我们讨论控制器和运算符API并提供示例控制操作实现。 我们在§4.2.3中提供了正确性证明。

4.2.1 Graph Transitions through Meta Topology

形式上,用户控制操作C可以被建模为将数据流执行图G(V,E)转换为新图G *(V *,E *)的变换。 对于运算符v,这种变换可以改变三元组中的一个或多个条目(S,f,P)。 对于边e,这种变换可以可选地改变pe。 特别地,由于Operator状态S可以捕获在长时间段内累积的状态,因此需要特别小心以在重新配置期间捕获状态的变换(即,G→G *)。 也就是说,对于v *∈V*,Sv *由一个或多个节点{v}⊆V上的变换函数T定义,即T({Sv})= Sv *。 在没有歧义的情况下,我们放松符号并使用T-1(v *)来表示状态Sv *所依赖的集合{v}⊆V。

大多数现有系统(例如[10,39])采用freeze-the-world方法来执行转换,即通过适当的检查点机制停止G,启动G *,将G上的旧检查点状态迁移到G *并恢复数据流。但是,这可能会触发反压,导致延迟增加和吞吐量损失,进而限制数据流重新配置的执行频率和表现。因此,在Chi的设计中,我们选择了异步替代方案:我们不是直接影响转换(即G→G *),而是引入一个中间元拓扑G 1,控制操作可以暂时利用它来完成异步转换。也就是说,在传播控制消息期间,每个Operator根据G1向其下游广播消息。在 G1 -G * 子图下的Operator处理完控制消息后,将关闭;而G1 -G * 子图下的Operator仅在完成控制消息处理后才开始处理数据消息。当控制消息传播完成时,产生的拓扑将等同于G *。

我们推导出元拓扑G* 对于控制操作C如下。 在最一般的情况下,我们设置G*=G∪G*∪EV,V *,其中EV,V * = {(v,v *)|∀v*∈V*,∀v∈T-1(v *)}。 换句话说,C的传播图包含来自新旧执行图的所有运算符和通道,以及捕获旧操作符和新运算符的状态之间的依赖关系的通道。 虽然这种方法可以在重新配置期间将数据流执行图的大小加倍,但实际上,我们可以通过适当的修剪来显着减少传播拓扑。 例如:

状态不变性。 如果控制操作没有改变节点v的状态Sv,我们可以用v∈G折叠相应的新节点v *∈G*,并合并与v相邻的输入和输出通道。例如,在图3(a)中 ,M * 1(M * 2)可以分别与M1(M2)合并。

非循环不变性。 只要我们能够保证图形的共生性,就可以积极地合并新旧拓扑。 例如,在图3(a)中,我们可以用R1(R2)进一步折叠R1(R2),而不破坏环状。 这是通过(i)确保初始数据流拓扑是非循环的功能查询接口以及(ii)修剪算法来保证的,该算法确保在优化元拓扑期间不引入循环。 例如,对于横向扩展/重新配置,修剪算法使用一致散列作为状态分配方案,以避免在重新分区状态时引入循环。

通过在图3(a)中重复应用上述修剪规则,我们获得了图2中阶段(IV)所示的图形。

4.2.2 Control API

接下来,我们将介绍控制API的功能,使开发人员能够实现复杂的控制操作。 Chi的API允许在以下维度上表达不同的行为:(1)空间,例如{M1,M2}的行为不同于{R1,R2,R3},以及(2)时间,例如R3在接收时的行为 第一个控制消息与图2中的最后一个消息。

为了实现这种灵活性,我们抽象控制操作并向用户提供以下功能:

       Configuration injection:我们允许相同的控制操作为不同的Operator提供不同的配置。 配置指示Operator采取不同的控制操作。 配置被注入控制消息中(参见图6以实现)。 运行时通过每个Operator的正确配置适当地实例化控制操作。 在图2所示的向外扩展示例中,注入的配置需要指示(1)映射器重新连接输出通道,(2)减速器R1和R2以迁移状态,以及(3)新的减速器R3到 接受迁移的国家。 如算法1(L1-11)所示,R1注入SplitState(L6)和LoadFunc(L9)指令,R3注入MergeState(L8)和LoadFunc指令。

       Reactive execution:Chi公开了一个被动(事件驱动)的编程接口,用户可以利用该接口来定义控制操作。控制操作包括两组事件处理程序:在控制器{OnInitAtController,OnBeginAtController,OnNextAtController,OnCompleteAtController,OnDis- poseAtController}执行的事件处理程序,以及在运算符{OnBegi- nAtOperator,OnNextAtOperator,OnCompleteAtOperator,OnDis- poseAtOperator}执行的事件处理程序。 。这些事件处理程序为用户提供了极大的灵活性,可在控制器和操作员收到第一个,下一个和最后一个控制消息时收集指标或修改配置。初始化控件操作时调用OnInitAtController,并允许用户将配置注入控件消息。处理控件操作时将调用OnDisposeAtController和OnDisposeAtOperator。它们通常用于释放资源。运行时处理这些处理程序的正确调用和状态转换,如图3(b)所示,从而支持以安全的方式表达复杂的控制逻辑。

       Blocking behavior:在图2的示例中,运算符在从其接收到控制消息时总是阻塞输入通道。我们发现这在许多控制场景中是一种相当普遍的模式,例如使用Checkpoint的放大/缩小操作。为了简化复杂控制的实现,我们提供了一个抽象层,允许用户以阻塞和非阻塞的方式实现其控制操作。我们通过将控制消息分为两类来实现这一点:(1)阻塞:Operator阻止接收控制消息的相应信道,并且只有当该操作符上的所有控制操作都完成时才解除阻塞, (2)非阻塞:Operator不阻塞输入通道并继续接收该通道上的其他数据/控制消息。我们相信这种抽象对于用户表达更高级的控制操作非常有用。例如,阻塞控制消息通常对影响状态的控制有用,而非阻塞控制消息对于其他情况(例如监视)是有用的。

Example:我们使用图2中所示的示例演示控制API的用法,其中我们要从G缩小为G *到G(如图3(b)所示)。算法1示出了数据流重新配置控制操作的伪实现。完成重新配置决策后,开发人员会创建阻止控制消息。在OnInitAtController中,创建阻塞控制消息并注入配置,以便在不同的操作员处触发控制操作。具体来说,如§4.2.1中所述,边(v,v *)∈EV,V *描述了v *和v的状态之间的依赖关系,运算符v需要被配置为拆分其状态并运送相应的部分状态为v *,而操作符v *需要配置为加载和合并接收状态(L5-8)。例如,如图2所示,R1需要分割状态并将范围['i' - 'l']的累计计数运送到R3。一旦拓扑或状态改变,操作员需要重置相关的计算功能(L9)。这样的控制消息被广播给源操作员。它首先初始化保存迁移状态的会话变量,并在OnBeginAtOperator函数中控制动作(L15-16)。迁移状态逐渐累积,直到OnNextAtOperator(L18-19)接收到状态的所有部分。一旦接收到所有消息,操作员(在OnCompleteAtOperator函数中显示)执行控制动作,包括根据新的状态键范围(L23-25)移除不再保持的状态,合并其他人给出的状态(L26-27) ,并重置功能(L28-29)。一旦控制器从接收器操作员接收到控制消息,控制操作就会在OnCompleteAtController(L12)中标记为已完成。

4.2.3 Correctness Properties

Chi提供正确性属性,可以帮助用户证明其控制操作的正确性。

THEOREM 1. Consider a control operation that changes a graph from G to G∗ using a control message and a state transformation function T. The control operation has the following properties:

1. The control operation will terminate in finite time.

2. If a pair of operators v,v? satisfies (a) v → v? is an edge in G or G∗, or (b) v ∈ T−1(Sv?), then v will always invoke OnCompleteAtOperator before v?.

此外,我们引入了安全阻塞控制操作 - 一种特殊类型的阻塞控制操作,其每个操作员的控制操作仅在OnCompleteAtOperator中读/写相应的操作员状态。 安全阻塞控制操作具有更强的属性 - 安全阻塞控制操作的语义等同于冻结世界方法 - 这可以帮助用户理解其定制控制操作的语义和正确性。 有关定理1的详细说明和证明以及安全阻塞控制操作的属性,请参阅技术报告1。

4.3 Advanced Functionalities

我们讨论了Chi的高级功能,这些功能是应对生产工作负载所必需的。

       Multiple Controllers。到目前为止,我们的描述假设一个数据流控制器强制执行控制操作。 我们的设计能够通过允许单个数据流的多个电流控制器自然地扩展控制器。 例如,用户可以每个所需功能具有一个控制器,例如,一个控制器负责监视数据流执行的健康状况,另一个控制器负责监视运行状态,而另一个负责重新配置数据流执行图以防万一激增的工作量负载。 只要保证不同控制操作的控制消息的可串行性,多个控制器就可以同时工作。 为了执行跨数据流控制操作,例如,协调跨多个数据流的资源分配,我们可以引入可以与每个数据流的控制器交互的全局控制器。

       Broadcast/aggregation trees. 在实践中,数据流图通常具有大量Source Operator(有时还有Sink Operator)。 在这种拓扑结构中,由于大的fan in/fan out,控制器很快就会成为瓶颈。 为了缓解这种情况,我们利用一种简单的技术,例如在Source(Sink)Operator之前(之后)插入跨越BroadcastAggregationTree

       Dealing with congestion/deadlock. 当出现拥塞时,例如,由于网络或CPU瓶颈,我们的反压机制被触发,所有消息(包括控制消息)都被延迟。 如果这些消息是缓解拥塞的控制操作的一部分,则这可能尤其重要。 一种选择可能是拥有两个单独的队列并使控制消息具有更高的优先级,以便在拥塞时首先交付它们。 然而,这会破坏控制和数据消息的顺序,从而难以保持一致性。 因此,我们等待处理消息。 这类似于其他系统采用的方法,如Flink [10]和Spark Streaming [41]。

       Fault tolerance。 集成控制和数据平面的主要好处之一是控制平面中的故障的处理方式与数据平面中的故障相同。 更具体地说,如果控制消息丢失,则底层数据信道负责重传它们,直到它们被另一端确认为止。 在网络分区的情况下,控制器最终会超时并将重新启动控制操作。

Chi允许开发人员实施各种策略来处理操作员故障。 为了便于采用,我们在Chi之上实施了一个检查点重放策略,以便在默认情况下处理操作员故障。 此策略将首先将数据流流回滚到最后一个检查点,然后它将重新插入丢失的控制消息。 控制器中的故障通过将其自身状态检查到持久存储中并在启动故障控制器的新实例(通常由监视器处理)时恢复状态来处理。

4.4 Compare Chi with Existing Models

接下来我们将Chi与流处理系统的现有控制模型进行比较(表1总结了比较)。 正在开发各种控制模型,针对不同的计算范例进行定制,即基于BSP的微批处理与一次记录。 我们根据一致性,易用性,开销和可扩展性来比较不同的模型。

       Consistency. 许多有用的控制操作,例如缩小,状态重新分区和检查点,确实需要一致性。 通过在并行计算(称为同步全局控制)之间使用同步障碍,可以在BSP系统中轻松实现一致性。 Chi也可以使用阻塞控制消息来实现这种一致性,阻塞控制消息充当在数据流内异步移动的屏障。 如果需要,Chi可以复制BSP模型。 具体地,控制器可以用作如图4(a)所示的屏障节点。 当一个阶段开始时,它产生(1)一个阻塞控制消息(用S表示),它在操作员处安装任务,然后是(2)描述输入数据的数据消息(用D表示),以及(3)a 阻止控制消息(由E降级),标志着一个阶段的完成。 当接收到所有完成消息时,控制器通过重复相同的消息序列来启动下一阶段。

通过在重新配置期间冻结整个数据流,也可以在一次记录系统中实现一致性。然而,这需要系统停止(如BSP),从而激励像SEEP [11]这样的系统异步重新配置工作程序(命名为异步本地控制);但牺牲了障碍的语义。异步本地控制也可以使用Chi的非阻塞控制消息来实现。缺少障碍使得很难实现许多有用的控制操作。考虑将过滤器应用于具有x和y字段的流的数据流(如图4(b)所示),其中由于隐私调节,过滤器参数存储在两个数据存储中。因此,必须复制流以并行过滤。最后将过滤结果连接起来。在仅提供异步本地控制的系统中,它无法支持请求简单并发重新配置的控制请求,例如将过滤器选择性从x <A和y <B更改为x <C和y <D。这是因为每个复制的元组必须同时处理所有新配置,而不是它们的混合。为了确保一致性,如SEEP [11]中的算法3所示,数据流必须阻止摄取,恢复操作员检查点,应用新配置,重放自上次检查点以来复制操作符的输出,以及取消阻塞摄取。

Easy-of-Use 除了一致性保证外,Chi还提供灵活的控制平面接口,并提供全面的自动化支持。 用户以被动方式声明控制逻辑,并依赖运行时自动管理控制操作的状态,处理故障并异步执行操作。 相比之下,现有的控制模型缺乏可编程性和运行时支持。 例如,Flink实现了一种分布式检查点算法,该算法无法支持一般控制操作,例如,需要更改状态。 最近,Dhalion研究了控制政策的高级代表性,特别关注识别检测到的异常的症状和疗法。 它依赖于底层系统,即Heron,来提供实际的重新配置能力。 因此,Dhalion不具有作为Chi的控制平面运行时间,其可以应用于一般的一次记录流式传输系统。

       Overhead. 重新配置BSP系统时,将停止整个数据流。这对于在线流处理系统尤其不利,并且会影响延迟和吞吐量。此外,BSP障碍严重限制了控制操作的频率和时间。为了分摊调度和通信成本,BSP屏障间隔通常设置为不小于秒[39]。如上所述,重新配置一次记录系统需要像在Flink中那样freeze-the-world(第§7节),或者像在SEEP中那样通过数据重放进行重新计算。一方面,freeze-the-world会产生与同步全球屏障相似的开销。另一方面,尽管重新配置的Operator通常已经缺乏资源,但重放数据在生产中是昂贵的。我们的跟踪显示,缓冲所有中间输出以准备重放需要大量内存,比操作员计算状态消耗的内存大几个数量级。重放如此大的缓冲区状态不仅消耗大量带宽,而且还会长时间阻塞运营商。

在Chi中,更改应用于异步全局障碍。无需冻结世界或数据重播。阻止行为是每个运营商的本地行为。没有全局阻塞冻结整个数据流,从而减少了数据平面开销。
       Scalability. 现有的流处理系统通常具有单独的控制平面。 这重复了控制平面的资源,以处理典型的分布式系统问题,例如容错和可伸缩性。 然而,Chi将控制平面嵌入数据平面。 由于数据平面针对处理大量数据进行了优化,因此Chi受益于相同的优化,例如,零拷贝数据移动和Broadcast/aggregation trees,用于数据流中的大量扇入/输出。 此外,现有系统大多采用集中控制器。 重新配置需要大量控制消息,因此会导致单点故障和性能瓶颈。 相反,Chi可以自然地在控制器上分区工作负载。 数据流由并行控制器管理。 数据流控制器进一步拆分以并行操作控制操作。 所有这些控制器都在多个服务器上运行,并在分布式存储上保持状态,这意味着需要高规模的体系结构。

5. APPLICATION EXAMPLES

为了展示我们方法的灵活性,我们演示了使用Chi实现的三个控制应用程序。

Continuous Monitoring. 由于我们工作负载的不可预测性,与传统的批处理系统不同,传统的批处理系统可以离线调整作业以实现最佳性能,因此必须持续监控流量管道并按需优化,以实现高性能和稳健性。 我们展示了使用Chi来持续收集所有Operator的测量数据的示例,这是用于检测和处理系统有趣特征的基础模块,例如overload/underload,数据分布中的skew/drift,间歇性的环境相关瓶颈和缓解散乱的人。 由于页面限制,监视控制操作实现在技术报告中显示。

请注意,控制器不再需要单独ping每个运算符以收集统计信息。 相反,度量标准以可伸缩的树形方式收集和聚合。 §7显示了使用监视控件收集每个连接密钥基数的评估,该基数有助于识别连接空间中的偏度(然后可以用于减少散乱图)。

       Dataflow Reconfiguration. 数据流重新配置对于几个有趣的用例非常重要,例如从给定查询添加/删除运算符,增加运算符的并行度以及利用计算重用。

除了放大/缩小外,还可以执行更复杂的重新配置,包括更改查询计划。例如,图5(a)演示了当我们在流连接查询中发现偏差时,我们如何将查询计划更改为异常的离散器。原来,流A和B使用shuffle join [26]连接,其中Reducer分别从A和B读取数据,并根据连接key

将数据分区并路由到相应的reducer;减少接收数据时,使用相同的key加入元组。由于key空间偏斜,reducer R1接收的数据远多于R2。此时,我们可以通过添加reducers {R3,R4}来更改查询计划,以共享R1的负载。我们的想法是让A(假设A的工作量明显高于B)将R1的负载分配到{R1,R3,R4}; B将R1的负载广播到{R1,R3,R4}。这种重新配置要求R1将其内部维护的数据哈希表从B复制到R3和R4,同时将数据哈希表从A分配并重新分配到R3和R4。

自动参数调整。 大数据系统有许多参数,即使对于经验丰富的工程师也很难调整。 通过在紧密控制回路中对监控和数据流重新配置进行杠杆化,可以将Chi用于自动参数调整,以模拟单个查询的多个实例的A / B测试。 例如,许多现有的流处理系统使用微批处理来在延迟和吞吐量之间进行权衡。 虽然大批量提供了良好的吞吐量,但它会增加延迟。 调整正确的批量大小是一个棘手的问题。 通过Chi的一个解决方案是持续监控数据平面的延迟并在我们看到观察到的延迟相当大的波动时调整批量大小,直到我们获得具有所需延迟的最大吞吐量。

6. IMPLEMENTATION & DISCUSSION

       Distributed runtime.为了展示Chi的性能和灵活性,我们在Flare上实现了它,这是我们团队内部使用的流处理系统。 Flare建立在Orleans [7]  - 虚拟角色框架之上 - 作为运行时和Trill [16]作为运营商级流处理引擎。 通过利用Orleans,Flare实现了分散的可扩展性,特别是:(1)节点可以在不通知主节点的情况下加入/离开集群;(2)演员的生命周期由平台自动管理,超越了内存的生命周期 实例化和特定服务器。

Chi中的Operator具有嵌入到Flare操作符中的堆叠体系结构,该操作符是单线程环境,如图5(b)所示。 (i)通信层准确地提供FIFO,一旦数据通信信道具有模拟TCP的背压。 (ii)消息调度器/多路复用器根据消息类型调用相应的处理模块,并将它们的输出多路复用到通信层。 (iii)数据处理器将Trill管道应用于数据消息。 (iv)控制处理器根据接收到的控制消息调用一系列本地控制动作。 它加载相应的控制配置,管理状态机,并相应地调用用户定义的控制操作。

Flare进一步提供了以下功能来简化控制操作:(i)用户友好的状态管理功能,它将Operator状态建模为Key-Value映射,并可以使用用户定义的分区方案自动分割和合并状态。 关键,(ii)一个查询编译器(可自定义优化规则扩展)类似于Spark的催化剂[4],它将LINQ查询转换为分布式数据流,以及(iii)文件服务器,允许运行时可扩展性,用户可以上传新的控制操作 依赖项并在运行时加载它们。

Custom serialization. 控制消息可以carry/propagate大的有效载荷,包括配置和状态/度量/等来自每个Operator。由于每个Operator的序列化和反序列化可能会带来不必要的开销,因此我们实现了零复制操作,允许我们从字节缓冲区中提取必要的部分(例如,配置,感兴趣的有效负载),而不会对整个消息进行去除。

图6显示了控制消息的结构。每条消息包括三个基本组件:1。元数据字段,用于确保FIFO准确一次传送; 2.配置有效载荷字段以存储不同Operator的配置; 3.Operator插入任何控制专用数据的数据有效载荷,例如,在缩小时重新分区状态。控制消息由控制器生成(在控制指令应用于任何操作符之前)或由Operator生成(在控制操作已触发之后但在控制消息传播到后续操作符之前)。

Portability. 虽然我们在Flare之上实现了我们的方法,以便在我们的内部集群中实现/部署,但我们的设计并不依赖于特定平台。 Chi可以应用于其他系统,只要系统提供至少一次或一次性交付语义的FIFO,以及有序消息处理。 这些适度的要求由大多数现代流媒体系统提供,如Flink [10]和SEEP [11]。 典型的移植计划包括:(1)如果底层系统不提供FIFO准确的一次消息传送,(2)移植消息调度器/多路复用器和控制处理器,以及(3)重用 现有数据处理器。

7. EVALUATION

在本节中,我们使用许多微基准和两个真实基准来评估Chi的性能。 第一个真实世界的基准测试侧重于动态扩展/输出资源,而第二个评估Chi的处理控制和数据故障的能力。 这些结果表明,即使在高数据/控制负载和大型数据流图表下,Chi也会产生可忽略的开销,并且能够快速响应工作负载或故障的变化。 为了展示我们方法的灵活性,我们还通过两个更高级的案例研究报告了实验,即处理倾斜的密钥分发和自动调整以满足SLO。

7.1 Experimental Setup

我们的实验群集包含Azure中的32个DS12v2实例。 每个虚拟机具有4个vCPU,28 GB RAM和10 Gbps网络连接。 我们考虑一个公共工作负载,Yahoo! 流式基准测试(YSB)[42]和一个基于生产跟踪的私有工作负载IPQ1,它由具有复杂窗口聚合和流连接的多阶段查询组成。 YSB在数据处理(字节/事件)方面相当轻量级,而IPQ1在计算上更重(KB /事件)。 正如§6中所解释的,我们在Flare之上实现了Chi,这是一个基于.NET CLR构建的流引擎,由我们的团队在内部使用。 在需要时,我们将比较Drizzle [39],一个Apache Spark v2.0.0(一个BSP风格的引擎)的分支,它带有一个用于流媒体场景的优化调度程序,以及Apache Flink v1.3.2 [10](一个连续的数据流引擎)。 对于我们的所有实验,我们在测量折扣自举效果之前预热JVM和CLR。

Flare performance。为了帮助理解我们用于Chi的底层引擎是否与现有系统竞争,我们将Flare与Flink和Drizzle的基本性能进行比较。在图7中,我们显示了使用YSB和IPQ1时三个系统的吞吐量和延迟的结果。对于吞吐量实验(图7(a)),我们将延迟SLO设置为350毫秒并最大化吞吐量,而对于延迟实验,我们将输入速率固定为2000万 tuples/秒并最小化延迟。按照惯例[39],我们将延迟定义为窗口结束后处理窗口中所有事件所需的时间。例如,如果窗口在时间a结束并且在时间b处理来自窗口的最后一个事件,则该窗口的处理等待时间计算为b-a。 Flink和Drizzle的结果与之前在文献[39,25]中报道的结果一致,并证实Flare的性能与这些系统相当。

7.2 Micro-benchmarks

在本小节中,我们研究了控制平面和数据平面之间的相互作用。 具体来说,我们对Chi的操作开销和可伸缩性方面感兴趣。 为此,我们在三种不同的CPU方案(分别为50%,75%和90%)下改变数据平面负载,并相应地设置两个工作负载YSB和IPQ1的摄取率。 对于计算量大的IPQ1,这导致20,40和60百万个事件/秒(对应于我们的32个服务器集群中平均CPU的56%,73%和94%),而对于通信量大的YSB,这导致 140,180和220万个事件/秒(分别为51%,74%和89%的平均CPU)。 对于控制负载,我们使用阻塞(B-Control)和非阻塞(NB-Control)消息来考虑控制消息的两个实例。 为了准确地隔离Chi的影响并避免使用自定义控制逻辑(例如,CPU密集型用户代码)偏置结果,我们在实验中仅使用NoOp控制消息。

Does control plane affect the data plane? 我们现在评估Chi引入的开销。在图8(a)和8(b)中,我们显示了当从一个控制消息改变控制平面负载时两个工作负载的延迟相对增加(代表数据流管理任务,例如,检查点和重新配置) - 例如,缩放/缩小)到100个控制消息/ s(代表监视任务)。该范围涵盖了我们在生产中观察到的所有控制方案,我们认为绝大多数用例都应该如此。

这些结果表明非阻塞和阻塞控制都具有低开销。这是因为这些控件都不需要全局同步 - 控制事件像数据事件一样以异步方式传播和处理。例如,在计算密集型IPQ1工作负载(6000万事件/秒)的最高负载下,即使对于高控制负载(100条消息/秒),Chi也会对数据平面造成低于20%的延迟损失。这很重要,因为平均CPU利用率已经达到了94%(通过向外扩展可以进一步降低延迟)。正如所料,阻塞控制消息比非阻塞消息更昂贵。这是因为阻塞控制消息需要本地同步,这会暂时阻塞输入通道。

       Does a busy data plane limit the control plane? 接下来,我们研究数据平面负载对控制消息完成时间的影响。关注的是,通过合并控制和数据事件,高数据平面负载可能会对控制消息的性能产生负面影响。为了验证这一点,我们重复实验,并测量阻塞和非阻塞消息的完成时间(见图8(c)和8(d))。作为参考点,我们还显示了数据平面的延迟(图中的“数据延迟”条)。消息等待时间定义为进入系统的消息M的时间戳与M触发的最后一条消息离开系统的时间戳之间的差异。除了排队延迟之外,数据(相应控制)消息的完成时间还取决于实际数据处理的复杂性(分别是控制逻辑)。因此,如果没有积压并且在接收时立即处理消息,则控制消息可以比数据消息更快地完成。

在大多数情况下,与数据消息传递相比,控制消息的完成速度相对较快。此外,当控制负载较低(一个或十个控制消息/ s)时,完成时间相对不受数据平面负载增加的影响。然而,我们观察到,在最高负载(对于YSB分别为2.2亿个事件/秒和对于IPQ1为6000万个事件/秒),控制消息的完成时间与数据延迟相似或更高。这对于阻止消息尤其明显。原因是后者在每个操作员处引入了本地同步,并且在高负载下,数据流中不同路径上的延迟存在较高的可变性(例如,由于工作不平衡)。然而,正如§4.2.2中所讨论的,我们观察到阻塞和非阻塞控制消息在语义上是等效的,尽管它们的实现和执行模型都不同。因此,为了减少完成时间,开发人员总是可以将阻塞控制消息转换为非阻塞版本。

Is the control plane scalable? 如第6节所述,Chi可以通过使用广播(聚合)树在控制器和运营商之间交换控制事件来扩展到具有大量源(接收器)的大型数据流。 为了评估其效果,在图9中,我们显示了控制消息的完成时间,因为我们增加了源的数量。 结果表明,即使对于非常大的数据流图(8,192个源),完成时间也会以对数方式增加并且仍然远低于100 ms。

7.3 Adaptivity and Fault-Tolerance

前面的部分已经表明,Chi会产生较低的开销和完成时间,并且可以扩展到大型数据流图。 在此之后,我们将研究Chi如何利用这些属性来改善使用YSB工作负载对数据平面的工作负载变化(动态弹性)和故障(故障恢复)的适应性。

Dynamic Elasticity. 我们将32服务器群集的输入速率设置为30M元组/秒。 在时间t = 40秒时,我们将输入速率加倍以模拟工作负载峰值。 同时,我们使用各自的策略在所有三个流引擎上启动横向扩展流程。 例如,在Apache Flink中,当横向扩展过程开始时,我们立即调用Savepoint [23]操作。 它外部存储一个独立的检查点,可用于Flink程序的停止和恢复,并使用更大的拓扑重新启动数据流。 我们将结果绘制在图10(a)中:

CHI。 在横向扩展时间(t = 40s),吞吐量没有明显下降,而延迟暂时达到5.8s。 但是,系统会在6秒内快速恢复到稳定状态。

APACHE FLINK。 在t = 40s时,吞吐量降至零(由于Savepoints启动的冻结世界)持续五秒。 相反,处理延迟最高可达35.6秒,重启后需要31秒才能恢复稳定状态。

DRIZZLE。 在t = 40s时,吞吐量没有明显下降,因为Drizzle使用BSP模型,因此不可能连续测量吞吐量。 与Flink类似,处理延迟时间达到6.1秒,并且在稳定之前需要额外的10秒。 值得注意的是,Drizzle在5秒后开始对工作负载峰值作出反应。 这是因为工作负载峰值发生在调度组[39]的中间,系统无法调整数据流拓扑 - 调度组大小越大,它对工作负载变化的反应就越慢。

Fault Tolerance:接下来,我们将检查在Chi之上实现的默认故障恢复操作(第§4.3节)。 与之前的实验一样,我们将摄取率设置为30M元组/秒。 我们还每隔10秒启用所有三个流处理系统的检查点。 在最后一个检查点后5秒的时间t = 40s,我们杀死虚拟机以模拟数据流中的故障,并且我们在所有三个流引擎上开始恢复过程(参见图10(b)):

CHI在失败时(t = 40s),吞吐量没有明显下降,而延迟暂时达到4.3秒。 但是,系统会在5秒内迅速恢复到稳定状态。

APACHE FLINK在t = 40s时,吞吐量降至零5秒(系统停机时间),因为Flink的恢复机制使用最后完成的检查点重新部署整个分布式数据流[22]。 处理延迟达到17.7秒。 然后重新启动后需要13秒才能恢复稳定状态。

DRIZZLE在t = 40s时,由于横向扩展实验中描述的原因,我们观察到吞吐量没有下降。 处理延迟时间达到9.1秒,恢复稳定状态需要11秒。 由于Spark需要重新运行失败批次的线路,因此Spark的恢复机制导致吞吐量下降(在恢复期间)在t = 50s左右。

7.4 Chi in Action

我们通过使用两个真实的复杂控制操作来展示Chi的性能来结束我们的评估,一个侧重于参数自动调整,另一个侧重于由于key值的数据倾斜导致的工作负载skew。

Auto-Tuning for SLOs:流处理系统包括许多用于调整系统行为的参数。 这为用户提供了极大的灵活性,但同时由于参数空间的大尺寸而使其任务大大复杂化。 作为一个代表性的例子,我们将注意力集中在输入数据批量大小上。 流处理系统使用批处理来分摊每元组处理成本。 批次大小直接影响系统性能。 正确识别最优值通常是一项非常重要的任务,如图11(a)所示,其中我们绘制了IPQ1工作负载的延迟和批量大小之间的关系。

为了说明Chi如何帮助正确调整批量大小,我们实现了一个控制操作,包括一个监控任务,它收集延迟信息,再加上一个重新配置任务,更新批量大小以满足之间的期望权衡。 潜伏。 为了说明这是如何工作的,我们展示了一个运行于11(b)的示例,其中我们设置控制操作以优化批量大小,给定输入速率为6,000万个事件/秒,并且上限延迟为500毫秒。 控制器每30秒收集一次延迟样本,更新处理延迟的移动平均值,并在需要时执行优化步骤。

最初(阶段I),控制器选择保守批次大小为40K事件,同时测量延迟和吞吐量。 它很快意识到这个批量大小不足以满足输入速率 - 它压倒了系统导致频繁的背压,这从图中的吞吐量波动中可以清楚地看出。 然后,从30秒标记(阶段II)开始,控制器将批量大小加倍至80K,这导致更稳定的吞吐量,将处理延迟减少到≈500ms。 在60秒标记(阶段III),控制器通过将批量大小加倍到160K来尝试第二个优化步骤,但很快就会检测到这样做,它将无法满足延迟SLO(500毫秒)。 最后它将批量大小恢复为80K(第四阶段)。

       Detecting and Adapting to Workload Skew: 如图1(c)和图1(d)中对生产工作负载的分析所示,对Key进行Join在数据分布中表现出高时间和空间偏差。 这种差异可能导致不平衡,导致落后者,并最终违反SLO。 为了显示这种影响,我们使用了另一种生产工作量IPQ2,它表现出高度的偏斜。 此工作负载是一种流连接查询,可在极大且高吞吐量的实时用户活动日志上进行自联接。 自连接的存在放大了关键偏斜。

与前面的示例一样,我们实现了一个由监视任务和重新配置组成的控制操作,其行为如图5(a)所示。 图11(d)和图11(e)分别显示了重新配置之前和之后的Key分组的分布。 在重新配置之前,一些任务处理大部分Key空间(如分布中的峰值所示),而在重新配置之后,Key空间在任务之间均匀分布。 这对延迟和吞吐量有直接的好处:在重新配置之后,吞吐量增加了26%,而延迟下降了61%(图11(c))。

8. RELATEDWORK

在单节点[16,1,30]和分布式设置[41,14,36,10,39,31,28,3]中已经对流处理进行了充分研究。 在高层次上,我们可以将这些系统的基础分为三类:continuous operator model模型[30,16,28,10,17,36,27]和BSP模型[41,14,3]。 尽管在这两种方法[16,6,5,11]中都在努力改进数据平面,但在改进控制平面方面的工作却相对较少[9,11,10]。 控制平面仍然需要处理分布式系统实现中遇到的常见问题,例如容错,可伸缩性和自适应性。 相反,在Chi中,通过利用数据平面传递控制消息,控制操作可以利用为数据平面引入的相同优化。

现有工作主要集中在基于持续Operator模型的流处理系统中控制平面的特定方面,例如异步检查点算法[9]。 Apache Flink [10]使用freeze-the-world的方法通过保存点和数据流重启来改变阶段的并行性。 Apache Storm [36]提供了非常有限的数据流重新平衡功能 - 由于系统不提供状态Operator支持,因此用户必须手动管理和迁移Operator状态。 这使得用户很难正确地重新平衡有状态的应用程序。

关于控制平面编程的工作量有限。 SEEP [11]提出了一种运算符API,它集成了动态扩展和故障恢复的实现。 SEEP API专注于管理并行性,是Chi在功能方面提供的一个子集。 Chi允许更灵活的控制操作,例如,解决数据偏差和连续监控。 此外,由于广泛的一致性模型,Chi支持一般控制操作。 相反,SEEP仅提供有限的操作集,因为其控制信号不同步。 此外,Chi通过自动执行复杂任务简化了控制平面编程。 必须在SEEP中手动实施和管理这些任务。 最近,Dhalion [24]提出了一种控制策略抽象,描述了检测到的异常的症状和解决方案。 如第§4.4节所述,Dhalion的政策可以在Chi之上实施。

流处理系统广泛采用分割符号。它们被引入用于分割连续流[37]以支持具有无约束状态的查询,例如分组和连接。标点符号缺少同步机制,这对于支持需要全局一致性保证的任何高级控制操作至关重要。 Aurora / Medusa [18]和继承人Borealis [8]是开创性的工作,探索分布式流处理中的查询自适应。 Borealis使用控制线来修改流查询,但是它们的同步需要由开发人员小心处理,这在分布式设置中是一项非常重要的任务。此外,控制线仅限于修改查询参数。 Esmaili等。 [35]使用非同步标点来修改连续查询,因此仅支持单输入和输出流。最近的流媒体系统,如Flink [10],MillWheel [2]和GigaScope [20],主要采用标点符号来检查维护任务,如检查点障碍,冲洗早期结果和检查心跳。他们的标点符号机制不能支持Chi的一般控制操作。

BSP流处理系统[38]很自然地在同步Barrier下重新配置数据流。 但是,由于对延迟和吞吐量的负面影响,Barrier的概念可能对流式工作负载不利。 因此,已经提出了几种技术来减轻BSP中同步引入的开销[41,39]。 例如,Drizzle [39]研究了基于快速适应性BSP的流媒体系统的调度方法,如何使用群组调度和预调度来减少延迟,但仍然为突然的环境变化提供合理的适应性。 由于Chi主要关注的是将控制嵌入到数据平面中,因此我们的工作是对这些工作的补充。

离线数据系统在运行时期间对重新配置也有很强的要求。 Chandramouli等。 [15]研究了检查点和恢复连续数据库查询的最佳执行计划。 最近,S-Store [12]在事务数据库之上添加了流语义。 可以在交易之间应用控制; 但是,系统遭受与BSP系统类似的同步开销。 为了支持新兴的强化学习算法,Project

Ray [29]为大型动态数据流开发了一个任务调度框架。 在Chi中,任务调度框架由Orans提供,Chi负责提供用于定制控制操作和分布式执行机制的API。 一个可能的未来方向是将Chi移植到Ray上。 这将增强Ray的人工智能和流处理系统作业的持续监控和重新配置功能。

9. Conclusion

Chi采用原则性方法来控制数据流。 Chi不是使用单独的控制平面通道,而是在数据平面上传播控制消息和数据消息。 这样可以支持重要的处理系统要求,例如零系统停机时间和频繁的重新配置,而不会影响易用性。 Chi对生产工作负载的实施和验证不仅提供了验证设计选择的见解,而且提供了有价值的工程经验,这对于这种云规模控制平面的成功至关重要。

一周一论文(翻译)——[VLDB 18] Chi:分布式流处理系统下可扩展的、可编程的控制计划模块相关推荐

  1. python流处理框架_Python操作分布式流处理系统Kafka

    什么是Kafka Kafka是一个分布式流处理系统,流处理系统使它可以像消息队列一样publish或者subscribe消息,分布式提供了容错性,并发处理消息的机制. Kafka的基本概念 kafka ...

  2. Apache shiro集群实现 (六)分布式集群系统下的高可用session解决方案---Session共享

    Apache shiro集群实现 (一) shiro入门介绍 Apache shiro集群实现 (二) shiro 的INI配置 Apache shiro集群实现 (三)shiro身份认证(Shiro ...

  3. 论文翻译:DeepFaceLab:一个简单,灵活的可扩展换脸框架

    DeepFaceLab:一个简单,灵活的可扩展换脸框架 时间有限,翻译仓促,为个人学习所用,仅供参考. DeepFaceLab: A simple, flexible and extensible f ...

  4. 一周一论文(翻译)——[VLDB 19] Minimizing Cost by Reducing Scaling Operators in Distributed Stream Processing

    Abstract 弹性分布式流处理系统能够动态地适应工作负载的变化.通常,这些系统通过向上或向下扩展来对输入数据的速率或资源利用水平做出反应.目标是优化系统的资源使用,从而降低其运营成本.但是,这种扩 ...

  5. 一周一论文(翻译)——[SIGMOD 19] Elasticutor:Rapid Elasticity for Realtime Stateful Stream Processing

    Abstract 弹性非常适用于流系统,以保证针对工作负载动态的低延迟,例如到达率的激增和数据分布的波动.现有系统使用以resource-centric的方法实现弹性,该方法在并行实例(即执行程序)之 ...

  6. 一周一论文(翻译)——[PVLDB 17] Dhalion: 基于Heron自适应调整的流处理系统

    Abstract 近年来,大规模实时分析需求激增,并且已开发出大量流处理系统来支持此类应用. 即使遇到硬件和软件故障,这些系统也能够继续进行流处理. 然而,这些系统并未解决其Operator面临的一些 ...

  7. 在大数据和人工智能时代,我们是否需要分布式流处理?

    在中国大数据和人工智能时代,许多数据密集型应用程序表现出传统批处理模型无法满足的要求.流媒体应用,如流分析,物联网数据处理,网络监控,或金融欺诈检测,必须支持高处理率,但始终达到亚秒级处理延迟.作为响 ...

  8. 一周一论文(翻译 总结)—— [SOSP 18] LITE Kernel RDMA Support for Datacenter Applications : 一个LITE 内核支持的RDMA通信库

    目录 Abstract 1. Introduction 2. BACKGROUND AND ISSUES OF RDMA 2.1 Background on RDMA 2.2 RDMA in Data ...

  9. 一周一论文(翻译 总结)—— [NSDI 17] TUX2: Distributed Graph Computation for Machine Learning 面向机器学习的分布式图处理系统

    1. Introduce 在图形引擎(如GraphLab [29])上的早期工作是基于机器学习的动机,基于观察到许多机器学习问题可以用图形自然而有效地建模,并通过迭代收敛算法解决.         问 ...

最新文章

  1. seo优化源码_seo按天计费系统,无需登陆批量查询关键词价格
  2. QT的QLatin1String类的使用
  3. aspmysql发布_ASP.NET Entity Framework with MySql服务器发布环境配置
  4. Light bulbs(上海icpc网络预选赛,差分离散化)
  5. 为什么Netty这么火?与Mina相比有什么优势?
  6. python爬虫的具体流程_[专栏作家]【Python】爬虫程序 (一)
  7. win10磁盘管理教程
  8. Python恶搞搞机程序弹窗
  9. HDU-4126(Genghis Khan the Conqueror)
  10. android 背景毛玻璃模糊化效果实现方法
  11. KEIL5护眼背景色以及字体颜色
  12. 深信服 2019校园招聘 研发试卷-2018.09.21
  13. 抗疫行动题材网页设计 大学生最美逆行者感动人物网页代码 众志成城万众一心抗击疫情HTML网页设计
  14. 基于计数栈的非递归二叉树遍历算法
  15. 使用kolla-ansible部署多节点OpenStack(T版)及对接Ceph
  16. 关于NeteaseCloudMusicApi接口文档
  17. CAD Electrical 如何设置原理图库为GB
  18. Elasticsearch宕机问题
  19. 使用小程序制作一个老照片修复工具,让追忆时光触手可及
  20. 读书 -- 《天才在左 疯子在右》

热门文章

  1. PYTHON爬取66影视的电影下载链接,有搜索功能
  2. 利用velocity.js将svg动起来
  3. 算法与数据结构--数组和链表的区别
  4. (转载)php array_merge 和 两数组相加区别
  5. 树形控件Tree Control
  6. loadrunner 录制 odbc 迭代出现lrd_db_option: ERROR, return-code=LRDE2009错误
  7. COM Surrogate 遇到问题需要关闭。我们对此引起的不便表示抱歉
  8. windows 常用工具
  9. 试题集 - 算法与编程
  10. 卫星定位导航行业的产业链