1 简介

SparkStreaming是Spark核心API的一个扩展,具有高吞吐量和容错能力的实时流数据处理系统,可以对多种数据源(如Kdfka、Flume、Twitter、Zero和TCP 套接字)进行类似Map、Reduce和Join等复杂操作,并将结果保存到外部文件系统、数据库或应用到实时仪表盘。

Spark Streaming在内部处理的机制原理是:先接受实时流的数据,并根据一定的时间间隔拆分成一批批的数据,这些批数据在Spark内核对应一个RDD实例,因此,流数据的DStream可以看成一组RDDs,然后通过调用Spark核心的作业处理这些批数据,最终得到处理后的一批批结果数据。

Spark Streaming的具体工作原理如下:

2. 术语定义

2.1 离散流DStream

Spark Streaming 提供了一种高级的抽象,叫做 DStream,英文全称为 Discretized Stream,中文翻译为“离散流”,它代表了一个持续不断的数据流。DStream可以通过输入数据源来创建,比如Kafka、Flume和Kinesis;也可以通过对其他DStream应用高阶函数来创建,比如map、reduce、join、window。

DStream的内部,其实一系列持续不断产生的RDD。RDD是Spark Core的核心抽象,即,不可变的,分布式的数据集。DStream中的每个RDD都包含了一个时间段内的数据。

对DStream应用的算子,比如map,其实在底层会被翻译为对DStream中每个RDD的操作。比如对一个DStream执行一个map操作,会产生一个新的DStream。但是,在底层,其实其原理为,对输入DStream中每个时间段的RDD,都应用一遍map操作,然后生成的新的RDD,即作为新的DStream中的那个时间段的一个RDD。底层的RDD的transformation操作,其实,还是由Spark Core的计算引擎来实现的。Spark Streaming对Spark Core进行了一层封装,隐藏了细节,然后对开发人员提供了方便易用的高层次的API。

2.2 Dstream Graph

Spark Streaming中作业的生成与Spark Core类似,对DStream的各种操作让他们之间建立起依赖关系,当遇到DStream使用输出操作时,对这些依赖关系以及它们之间的操作会被记录到名为DStreamGraph的对象中表示一个作业。这些作业注册到DstreamGraph中并不会立即被运行,而是等到Spark Streaming启动后,达到批处理时间时,才根据DStreamGraph生成作业处理该批处理时间内接受的数据。

2.3 批处理间隔

在Spark Streaming中,数据采集是逐条进行的,而数据处理时按照批次进行的,因此在Spark Streaming中会先设置批处理间隔(batch duration)。

2.4 窗口间隔(Window Duration)和滑动间隔(Slide Duration)

对于窗口操作而言,其窗口内部会有N个批处理数据,批处理数据的个数是由窗口间隔决定的,其为窗口持续的时间,在窗口操作中只有窗口间隔满足了才会触发批处理数据的处理。除了窗口的长度,另一个重要参数就是滑动间隔(Slide Duration),它指的是经过多长时间窗口滑动一次,形成新的窗口,滑动窗口默认为情况下和批处理间隔相同,而窗口间隔一般设置地比他们两个都大。

3. Spark Streaming 特点

3.1 流式处理

Spark Streaming是将流式计算分解成一系列短小的批处理作业。

3.2 高容错

对于流式计算来说,容错性至关重要,首先我们要明确一下SparkRDD的容错机制。每一个RDD都是一个不可变的分布式可重新计算的数据集,其记录着确定性的操作“血统”(lineage),所以只要输入数据是可以容错的,那么任意一个RDD的分区(partition)出错或者不可用,都是可以利用输入数据通过转换操作而重新计算的。

3.3 低延迟

对于目前版本的Spark Streaming而言,其最小的Batch Size的选择在0.5~2s之间。

3.4 吞吐量高

4. Spark Streaming 编程模型

4.1 DStream的输入源

在Spark Streaming中所有的操作都是基于流的,而输入源是这一系列操作的起点。输入 DStreams 和 DStreams 接收的流都代表输入数据流的来源,在Spark Streaming 提供两种内置数据流来源:

  • 基础来源 在 StreamingContext API 中直接可用的来源。例如:文件系统、Socket(套接字)连接和 Akka actors;
  • 高级来源 如 Kafka、Flume、Kinesis、Twitter 等,可以通过额外的实用工具类创建。

4.2 DStream 的操作

与RDD类似,DStream也提供了自己的一系列操作方法,这些操作可以分成三类:普通的转换操作、窗口转换操作和输出操作。

4.2.1 普通的转换操作

转换 描述
map(func) 源 DStream的每个元素通过函数func返回一个新的DStream。
flatMap(func) 类似与map操作,不同的是每个输入元素可以被映射出0或者更多的输出元素。
filter(func) 在源DSTREAM上选择Func函数返回仅为true的元素,最终返回一个新的DSTREAM 。
flatMap(func) 类似与map操作,不同的是每个输入元素可以被映射出0或者更多的输出元素。
repartition(numPartitions) 通过输入的参数numPartitions的值来改变DStream的分区大小。
union(otherStream) 返回一个包含源DStream与其他 DStream的元素合并后的新DSTREAM。
count() 对源DStream内部的所含有的RDD的元素数量进行计数,返回一个内部的RDD只包含一个元素的DStreaam。
reduce(func) 使用函数func(有两个参数并返回一个结果)将源DStream 中每个RDD的元素进行聚 合操作,返回一个内部所包含的RDD只有一个元素的新DStream。
countByValue() 计算DStream中每个RDD内的元素出现的频次并返回新的DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素出现的频次。
reduceByKey(func, [numTasks]) 当一个类型为(K,V)键值对的DStream被调用的时候,返回类型为类型为(K,V)键值对的新 DStream,其中每个键的值V都是使用聚合函数func汇总。注意:默认情况下,使用 Spark的默认并行度提交任务(本地模式下并行度为2,集群模式下位8),可以通过配置numTasks设置不同的并行任务数。
join(otherStream, [numTasks]) 当被调用类型分别为(K,V)和(K,W)键值对的2个DStream 时,返回类型为(K,(V,W))键值对的一个新 DSTREAM。
cogroup(otherStream, [numTasks]) 当被调用的两个DStream分别含有(K, V) 和(K, W)键值对时,返回一个(K, Seq[V], Seq[W])类型的新的DStream。
map(func) 源 DStream的每个元素通过函数func返回一个新的DStream。
transform(func) 通过对源DStream的每RDD应用RDD-to-RDD函数返回一个新的DStream,这可以用来在DStream做任意RDD操作。
updateStateByKey(func) 返回一个新状态的DStream,其中每个键的状态是根据键的前一个状态和键的新值应用给定函数func后的更新。这个方法可以被用来维持每个键的任何状态数据。

在上面列出的这些操作中,transform()方法和updateStateByKey()方法值得我们深入的探讨一下:

  • transform(func)操作

该transform操作(转换操作)连同其其类似的 transformWith操作允许DStream 上应用任意RDD-to-RDD函数。它可以被应用于未在 DStream API 中暴露任何的RDD操作。例如,在每批次的数据流与另一数据集的连接功能不直接暴露在DStream API 中,但可以轻松地使用transform操作来做到这一点,这使得DStream的功能非常强大。例如,你可以通过连接预先计算的垃圾邮件信息的输入数据流(可能也有Spark生成的),然后基于此做实时数据清理的筛选,如下面官方提供的伪代码所示。事实上,也可以在transform方法中使用机器学习和图形计算的算法。

  • updateStateByKey操作

该 updateStateByKey 操作可以让你保持任意状态,同时不断有新的信息进行更新。要使用此功能,必须进行两个步骤 :

(1) 定义状态 - 状态可以是任意的数据类型。

(2) 定义状态更新函数 - 用一个函数指定如何使用先前的状态和从输入流中获取的新值 更新状态。

让我们用一个例子来说明,假设你要进行文本数据流中单词计数。在这里,正在运行的计数是状态而且它是一个整数。我们定义了更新功能如下:

此函数应用于含有键值对的DStream中(如前面的示例中,在DStream中含有(word,1)键值对)。它会针对里面的每个元素(如wordCount中的word)调用一下更新函数,newValues是最新的值,runningCount是之前的值。

4.2.2 窗口转换操作

Spark Streaming 还提供了窗口的计算,它允许你通过滑动窗口对数据进行转换,窗口转换操作如下:

转换 描述
window(windowLength, slideInterval) 返回一个基于源DStream的窗口批次计算后得到新的DStream。
countByWindow(windowLength,slideInterval) 返回基于滑动窗口的DStream中的元素的数量。
reduceByWindow(func, windowLength,slideInterval) 基于滑动窗口对源DStream中的元素进行聚合操作,得到一个新的DStream。
reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks]) 基于滑动窗口对(K,V)键值对类型的DStream中的值按K使用聚合函数func进行聚合操作,得到一个新的DStream。
reduceByKeyAndWindow(func, invFunc,windowLength, slideInterval, [numTasks]) 一个更高效的reduceByKkeyAndWindow()的实现版本,先对滑动窗口中新的时间间隔内数据增量聚合并移去最早的与新增数据量的时间间隔内的数据统计量。例如,计算t+4秒这个时刻过去5秒窗口的WordCount,那么我们可以将t+3时刻过去5秒的统计量加上[t+3,t+4]的统计量,在减去[t-2,t-1]的统计量,这种方法可以复用中间三秒的统计量,提高统计的效率。
countByValueAndWindow(windowLength,slideInterval, [numTasks]) 基于滑动窗口计算源DStream中每个RDD内每个元素出现的频次并返回DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素频次。与countByValue一样,reduce任务的数量可以通过一个可选参数进行配置。

4.2.3 输出操作

Spark Streaming允许DStream的数据被输出到外部系统,如数据库或文件系统。由于输出操作实际上使transformation操作后的数据可以通过外部系统被使用,同时输出操作触发所有DStream的transformation操作的实际执行(类似于RDD操作)。以下表列出了目前主要的输出操作:

转换 描述
print() 在Driver中打印出DStream中数据的前10个元素。
saveAsTextFiles(prefix, [suffix]) 将DStream中的内容以文本的形式保存为文本文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。
saveAsObjectFiles(prefix, [suffix]) 将DStream中的内容按对象序列化并且以SequenceFile的格式保存。其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。
saveAsHadoopFiles(prefix, [suffix]) 将DStream中的内容以文本的形式保存为Hadoop文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。
foreachRDD(func) 最基本的输出操作,将func函数应用于DStream中的RDD上,这个操作会输出数据到外部系统,比如保存RDD到文件或者网络数据库等。需要注意的是func函数是在运行该streaming应用的Driver进程里执行的。

5. 容错、持久化和性能调优

5.1 容错

DStream基于RDD组成,RDD的容错性依旧有效,我们首先回忆一下SparkRDD的基本特性。

  • RDD是一个不可变的、确定性的可重复计算的分布式数据集。RDD的某些partition丢失了,可以通过血统(lineage)信息重新计算恢复;
  • 如果RDD任何分区因worker节点故障而丢失,那么这个分区可以从原来依赖的容错数据集中恢复;
  • 由于Spark中所有的数据的转换操作都是基于RDD的,即使集群出现故障,只要输入数据集存在,所有的中间结果都是可以被计算的。

Spark Streaming是可以从HDFS和S3这样的文件系统读取数据的,这种情况下所有的数据都可以被重新计算,不用担心数据的丢失。但是在大多数情况下,Spark Streaming是基于网络来接受数据的,此时为了实现相同的容错处理,在接受网络的数据时会在集群的多个Worker节点间进行数据的复制,通过RDD设置默认存储级别为Memroy_AND_DISK_2(默认的复制数是2),这导致产生在出现故障时被处理的两种类型的数据:

1)Data received and replicated :一旦一个Worker节点失效,系统会从另一份还存在的数据中重新计算。

2)Data received but buffered for replication :一旦数据丢失,可以通过RDD之间的依赖关系,从HDFS这样的外部文件系统读取数据。

此外,有两种故障,我们应该关心:

(1)Worker节点失效:通过上面的讲解我们知道,这时系统会根据出现故障的数据的类型,选择是从另一个有复制过数据的工作节点上重新计算,还是直接从从外部文件系统读取数据。

(2)Driver(驱动节点)失效 :如果运行 Spark Streaming应用时驱动节点出现故障,那么很明显的StreamingContext已经丢失,同时在内存中的数据全部丢失。对于这种情况,Spark Streaming应用程序在计算上有一个内在的结构——在每段micro-batch数据周期性地执行同样的Spark计算。这种结构允许把应用的状态(批次数据的元数据信息,亦称checkpoint)周期性地保存到可靠的存储空间中,并在driver重新启动时恢复该状态。具体做法是在ssc.checkpoint(<checkpoint directory>)函数中进行设置,Spark Streaming就会定期把DStream的元信息写入到HDFS中,一旦驱动节点失效,丢失的StreamingContext会通过已经保存的检查点信息进行恢复。

5.2 预写日志 WriteAheadLogs

从Spark Streaming 1.2 版本开始引入了预写日志的功能(WriteAheadLogs)。实时流处理系统必须要能在24/7时间内工作,因此它需要具备从各种系统故障中恢复过来的能力。最开始,SparkStreaming就支持从driver和worker故障恢复的能力。然而有些数据源的输入可能在故障恢复以后丢失数据。在Spark1.2版本中,Spark已经在SparkStreaming中对预写日志(也被称为journaling)作了初步支持,改进了恢复机制,并使更多数据源的零数据丢失有了可靠。

对于文件这样的源数据,driver恢复机制足以做到零数据丢失,因为所有的数据都保存在了像HDFS或S3这样的容错文件系统中了。但对于像Kafka和Flume等其它数据源,有些接收到的数据还只缓存在内存中,尚未被处理,它们就有可能会丢失。这是由于Spark应用的分布操作方式引起的。当driver进程失败时,所有在standalone/yarn/mesos集群运行的executor,连同它们在内存中的所有数据,也同时被终止。对于Spark Streaming来说,从诸如Kafka和Flume的数据源接收到的所有数据,在它们处理完成之前,一直都缓存在executor的内存中。纵然driver重新启动,这些缓存的数据也不能被恢复。为了避免这种数据损失,在Spark1.2发布版本中引进了预写日志(WriteAheadLogs)功能。

在一个Spark Streaming应用开始时(也就是Driver开始)。相关的Streaming Context(所有流功能的基础功能)使用SparkContext 启动接收器成为长驻运行任务。这些接收器接受并保存数据到Spark内存中一共处理。用户传输数据的生命周期如下图所示:

(1)接受数据:接收器将数据分成一系列小块,存储到Executor内存或者磁盘中,如果启动了预写日志,数据同时还写入到容错文件系统的预写日志文件中。

(2)通知StreamContext:接受块的元数据(Meatdata)被发送到Driver的StreamingContext。1.这个元数据包括:定位其在executor内存或者磁盘中数据位置的块信息。2.块数据在日志文件中的偏移信息。如果启动了预写日志,数据同时还写入到容错文件系统的预写日志文件中。

(3)处理数据:每批数据的间隔,流上下文使用块信息产生弹性分布式数据集RDD和他们的作业Job,StreamingContext通过运行任务处理Executor内存或者磁盘中的数据块执行作业。

(4)周期性的设置检查点:为了恢复的需要,流计算(即StreamingContext)提供来的DStream)周期性的设置检查点,并保存到同一个容错文件系统的另外一组文件中。

当一个失败的Driver端重启的时候,会进行如下处理:

(1)恢复计算:使用检查点信息重启Driver,重新构造上下文重启接收器。

(2)恢复元数据:为了保证能够继续下去所必备的全部元数据块都被恢复。

(3)未完成作业的重新生成:由于失败而没有处理完成的批处理,将使用会的元数据再次产生RDD和对应的作业。

(4)读取保存在日志中的块数据:在这些作业执行时,块数据之间从预写日志中读出,这将恢复在日志中可靠地保存所有必要的数据。

(5)重发尚未确认的数据:失败时没有保存到日志中的缓存数据将由数据源再次发送。

6. 持久化

与RDD类似,Spark Streaming也可以让开发人员手动控制,将数据流中的数据持久化到内存中。对DStream调用persist()方法,就可以让Spark Streaming自动将该数据流中的所有产生的RDD,都持久化到内存中。如果要对一个DStream多次执行操作,那么,对DStream持久化是非常有用的。因为多次操作,可以共享使用内存中的一份缓存数据。

对于基于窗口的操作,比如reduceByWindow、reduceByKeyAndWindow,以及基于状态的操作,比如updateStateByKey,默认就隐式开启了持久化机制。即Spark Streaming默认就会将上述操作产生的Dstream中的数据,缓存到内存中,不需要开发人员手动调用persist()方法。

对于通过网络接收数据的输入流,比如socket、Kafka、Flume等,默认的持久化级别,是将数据复制一份,以便于容错。相当于是MEMORY_ONLY_SER_2。

与RDD不同的是,默认的持久化级别,统一都是要序列化的。

7. 性能调优

7.1 优化运行时间

  • 增加并行度 确保使用整个集群的资源,而不是把任务集中在几个特定的节点上。对于包含shuffle的操作,增加其并行度以确保更为充分地使用集群资源;

  • 减少数据序列化,反序列化的负担 Spark Streaming默认将接受到的数据序列化后存储,以减少内存的使用。但是序列化和反序列话需要更多的CPU时间,因此更加高效的序列化方式(Kryo)和自定义的系列化接口可以更高效地使用CPU;

  • 设置合理的batch duration(批处理时间间) 在Spark Streaming中,Job之间有可能存在依赖关系,后面的Job必须确保前面的作业执行结束后才能提交。若前面的Job执行的时间超出了批处理时间间隔,那么后面的Job就无法按时提交,这样就会进一步拖延接下来的Job,造成后续Job的阻塞。因此设置一个合理的批处理间隔以确保作业能够在这个批处理间隔内结束时必须的;

  • 减少因任务提交和分发所带来的负担 通常情况下,Akka框架能够高效地确保任务及时分发,但是当批处理间隔非常小(500ms)时,提交和分发任务的延迟就变得不可接受了。使用Standalone和Coarse-grained Mesos模式通常会比使用Fine-grained Mesos模式有更小的延迟。

7.2 优化内存使用

  • 控制batch size(批处理间隔内的数据量) Spark Streaming会把批处理间隔内接收到的所有数据存放在Spark内部的可用内存区域中,因此必须确保当前节点Spark的可用内存中少能容纳这个批处理时间间隔内的所有数据,否则必须增加新的资源以提高集群的处理能力;

  • 及时清理不再使用的数据 前面讲到Spark Streaming会将接受的数据全部存储到内部可用内存区域中,因此对于处理过的不再需要的数据应及时清理,以确保Spark Streaming有富余的可用内存空间。通过设置合理的spark.cleaner.ttl时长来及时清理超时的无用数据,这个参数需要小心设置以免后续操作中所需要的数据被超时错误处理;

  • 观察及适当调整GC策略 GC会影响Job的正常运行,可能延长Job的执行时间,引起一系列不可预料的问题。观察GC的运行情况,采用不同的GC策略以进一步减小内存回收对Job运行的影响。

Spark详解(十二):Spark Streaming原理和实现相关推荐

  1. Linux内核Thermal框架详解十二、Thermal Governor(2)

    本文部分内容参考 万字长文 | Thermal框架源码剖析, Linux Thermal机制源码分析之框架概述_不捡风筝的玖伍贰柒的博客-CSDN博客, "热散由心静,凉生为室空" ...

  2. Spark详解(二):Spark完全分布式环境搭建

    1. 前言 本文搭建了一个由三节点(master.slave1.slave2)构成的Spark完全分布式集群,并通过Spark分布式计算的一个示例测试集群的正确性.本文将搭建一个支持Yarn的完全分布 ...

  3. 攻防世界杂项(misc)--新手练习区(详解十二道题完结,附件做题过程中使用到的各种工具和网站)

    攻防世界杂项(misc)–新手练习区(详解) 第一题:this_is_flag 题目描述:Most flags are in the form flag{xxx}, for example:flag{ ...

  4. JavaScript 各种参数 详解(十二)

    程序代码 ' *---------------------------------------------------------------------------- ' * 函数:CheckIn ...

  5. MFS详解(二)——MFS原理和架构

    今天继续给大家介绍Linux运维相关知识,本文主要内容是MFS介绍. 一.MFS架构 MFS的架构如下图所示: 二.MFS组件 在上图中,可以看出,MFS有以下三大组件: 1.Master Maste ...

  6. 数字音频总线A2B开发详解十二(A2B一Master板做音效处理-31段EQ,高中低音分频等)

    作者的话 从板B上,我们把Master板上直通过来的音频信号,通过板子上的ADAU1761进行调音,可以让每一块从板都发出自己的声音,那么可不可以从源头,我们在Master上就把声音分配好,高音你去B ...

  7. Spark SQL原理及常用方法详解(二)

    Spark SQL 一.Spark SQL基础知识 1.Spark SQL简介 (1)简单介绍 (2)Datasets & DataFrames (3)Spark SQL架构 (4)Spark ...

  8. Spark RDD 论文详解(二)RDDs

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  9. Spark 3.2.0 版本新特性 push-based shuffle 论文详解(二)背景和动机

    前言 本文隶属于专栏<大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见大数据技术体系 目录 Spark 3.2.0 ...

  10. 即时通讯音视频开发(十八):详解音频编解码的原理、演进和应用选型

    1.引言 大家好,我是刘华平,从毕业到现在我一直在从事音视频领域相关工作,也有一些自己的创业项目,曾为早期Google Android SDK多媒体架构的构建作出贡献. 就音频而言,无论是算法多样性, ...

最新文章

  1. dp背包九讲(待补充,暂时前两讲)
  2. Java AQS论文翻译
  3. codeforces 667A A. Pouring Rain(水题)
  4. Android热补丁之Robust(三)坑和解
  5. Linux下的第一个驱动程序
  6. 网络流媒体协议之——RTSP协议
  7. JDK源码解析之 Java.lang.Boolean
  8. python未将对象引用设置到对象的实例_在Python中使用pingarapi。服务器引发Webfault:对象引用未设置为对象的实例...
  9. 计算机二级证学的什么,考计算机二级证需要学什么
  10. Go 触发 GC 的时机有哪些?能手动触发吗?
  11. td onmouseover=this.style.cursor='hand' onmouseout=this.style.cursor='normal' 小手状
  12. 看看你的密码有多安全?
  13. flutter压缩图片上传
  14. Drawable的setBounds方法
  15. STM32单片机的学习方法(方法大体适用所有开发版入门)
  16. 什么是搜索引擎优化(SEO)
  17. linux键盘控制鼠标软件下载,手机控制电脑软件(Mouse Server)
  18. java添加窗体中_java中利用JFrame创建窗体 【转】
  19. Pinbox 网络收藏夹使用指南
  20. 用C语言复现用贪吃蛇游戏(二)完结

热门文章

  1. 针对maven install 报错:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1 解决方案...
  2. js实现元素水平垂直居中
  3. the Determine in June
  4. hdu 4607 Park Visit 求树的直径
  5. android系统学习笔记十一
  6. 用Delphi设计能携带附件的EMail
  7. 3.什么叫堆排序?与快速排序有什么不同?
  8. Python3编程语言之enumerate() 函数使用示例
  9. 阿里云学生计划领取攻略
  10. 抖音测试心理是什么软件,实用心理测试大全抖音版-抖音实用心理测试大全小游戏官方版预约 v1.0-友情手机站...